diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 8f8befd83480..7d2f48837f78 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -648,6 +648,13 @@ public enum ConfVars { "hive.txn.acid.metrics.delta.pct.threshold", 0.01f, "Percentage (fractional) size of the delta files relative to the base directory. Deltas smaller than this threshold " + "count as small deltas. Default 0.01 = 1%.)"), + METASTORE_JDBC_SLOW_QUERIES("metastore.jdbc.execution.logSlowQueriesThreshold", "metastore.jdbc.execution.logSlowQueriesThreshold", + 3000, "Log the slow jdbc query that Metastore has been waiting for the result beyond the threshold(ms), " + + "should enable the metastore.profile.jdbc.execution first"), + METASTORE_PROFILE_JDBC_EXECUTION("metastore.profile.jdbc.execution", "metastore.profile.jdbc.execution", true, + "Profile the jdbc executions, will give the histogram about the jdbc read and write if check the metrics"), + METASTORE_PROFILE_JDBC_THRIFT_APIS("metastore.jdbc.profile.thrift.apis", "metastore.jdbc.profile.thrift.apis", + "get_table_req,get_database_req", "List of thrift methods that want to monitor the underlying jdbc executions"), COMPACTOR_INITIATOR_ON("metastore.compactor.initiator.on", "hive.compactor.initiator.on", true, "Whether to run the initiator thread on this metastore instance or not.\n" + "Set this to true on one instance of the Thrift metastore service as part of turning\n" + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandlerContext.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandlerContext.java index d52755786d46..ec726b5b04c3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandlerContext.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandlerContext.java @@ -19,7 +19,9 @@ import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.handler.BaseHandler; @@ -55,6 +57,24 @@ public final class HMSHandlerContext { private Map timerContexts = new HashMap<>(); + // Unique ID of current thrift call + private CallCtx callCtx; + + public record CallCtx(String methodName, long startTime, AtomicLong totalTime) { + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) + return false; + CallCtx callCtx = (CallCtx) o; + return startTime == callCtx.startTime && Objects.equals(methodName, callCtx.methodName); + } + + @Override + public int hashCode() { + return Objects.hash(methodName, startTime); + } + } + private HMSHandlerContext() { } @@ -89,6 +109,14 @@ public static Map getTimerContexts() return context.get().timerContexts; } + public static Optional getCallCtx() { + return Optional.ofNullable(context.get().callCtx); + } + + public static void setCallCtx(CallCtx ctx) { + context.get().callCtx = ctx; + } + public static void setRawStore(RawStore rawStore) { context.get().rawStore = rawStore; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java index 0d52107ad095..62df9e183ba6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java @@ -21,7 +21,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.jdo.JDOException; @@ -44,10 +46,12 @@ public class RetryingHMSHandler extends AbstractHMSHandlerProxy { private final long retryInterval; private final int retryLimit; + private final boolean local; public RetryingHMSHandler(Configuration conf, IHMSHandler baseHandler, boolean local) throws MetaException { super(conf, baseHandler, local); + this.local = local; retryInterval = MetastoreConf.getTimeVar(origConf, ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS); retryLimit = MetastoreConf.getIntVar(origConf, ConfVars.HMS_HANDLER_ATTEMPTS); @@ -87,9 +91,22 @@ public Result invokeInternal(final Object proxy, final Method method, final Obje } Object object = null; boolean isStarted = Deadline.startTimer(method.getName()); + boolean clearLocal = false; try { + if (!local) { + Optional previousCall = HMSHandlerContext.getCallCtx(); + if (previousCall.isEmpty()) { + HMSHandlerContext.CallCtx currentCall = + new HMSHandlerContext.CallCtx(method.getName(), System.currentTimeMillis(), new AtomicLong()); + HMSHandlerContext.setCallCtx(currentCall); + clearLocal = true; + } + } object = method.invoke(baseHandler, args); } finally { + if (clearLocal) { + HMSHandlerContext.setCallCtx(null); + } if (isStarted) { Deadline.stopTimer(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java index 78fa406444bc..6ffe619f5f9b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java @@ -20,11 +20,14 @@ import java.io.Closeable; import java.io.IOException; import java.sql.SQLException; +import java.util.Map; import java.util.Properties; +import java.util.function.Consumer; import javax.sql.DataSource; import com.google.common.collect.Iterables; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; public interface DataSourceProvider { @@ -77,6 +80,9 @@ static String getMetastoreJdbcPasswd(Configuration conf) throws SQLException { } static String getMetastoreJdbcDriverUrl(Configuration conf) throws SQLException { + if (MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_PROFILE_JDBC_EXECUTION)) { + return MetastoreDriver.getMetastoreDbUrl(conf); + } return MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY); } @@ -84,6 +90,18 @@ static String getDataSourceName(Configuration conf) { return conf.get(DataSourceNameConfigurator.DATA_SOURCE_NAME); } + static void preparePool(Configuration configuration, Consumer initSql, + Consumer> dataSourceProps) { + String url = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECT_URL_KEY); + DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(url, configuration); + String s = dbProduct.getPrepareTxnStmt(); + if (s != null) { + initSql.accept(s); + } + Map properties = dbProduct.getDataSourceProperties(); + properties.entrySet().forEach(dataSourceProps); + } + class DataSourceNameConfigurator implements Closeable { static final String DATA_SOURCE_NAME = "metastore.DataSourceProvider.pool.name"; private final Configuration configuration; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java index 92160541a504..7e2ea9c75c8f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java @@ -20,7 +20,6 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Collections; -import java.util.Map; import javax.sql.DataSource; @@ -32,7 +31,6 @@ import org.apache.commons.pool2.impl.BaseObjectPoolConfig; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,12 +71,6 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc dbcpDs.setDefaultReadOnly(false); dbcpDs.setDefaultAutoCommit(true); - DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(driverUrl, hdpConfig); - Map props = dbProduct.getDataSourceProperties(); - for (Map.Entry kv : props.entrySet()) { - dbcpDs.setConnectionProperties(kv.getKey() + "=" + kv.getValue()); - } - long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 30000L); int connectionMaxIlde = hdpConfig.getInt(CONNECTION_MAX_IDLE_PROPERTY, 8); int connectionMinIlde = hdpConfig.getInt(CONNECTION_MIN_IDLE_PROPERTY, 0); @@ -127,10 +119,16 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc objectPool.setSoftMinEvictableIdleDuration(Duration.ofMillis(600 * 1000)); } } - String stmt = dbProduct.getPrepareTxnStmt(); - if (stmt != null) { - poolableConnFactory.setConnectionInitSql(Collections.singletonList(stmt)); - } + StringBuilder connectionProperties = new StringBuilder(); + DataSourceProvider.preparePool(hdpConfig, + stmt -> poolableConnFactory.setConnectionInitSql(Collections.singletonList(stmt)), + kv -> { + if (connectionProperties.length() > 0) { + connectionProperties.append(';'); + } + connectionProperties.append(kv.getKey()).append('=').append(kv.getValue()); + dbcpDs.setConnectionProperties(connectionProperties.toString()); + }); return new PoolingDataSource(objectPool); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java index 91f1ea3d5574..5cdab60e2be5 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java @@ -21,7 +21,6 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.slf4j.Logger; @@ -29,7 +28,6 @@ import javax.sql.DataSource; import java.sql.SQLException; -import java.util.Map; import java.util.Properties; /** @@ -77,19 +75,8 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc config.setMinimumIdle(Math.min(maxPoolSize, minimumIdle)); } - DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(driverUrl, hdpConfig); - - String s = dbProduct.getPrepareTxnStmt(); - if (s!= null) { - config.setConnectionInitSql(s); - } - - Map props = dbProduct.getDataSourceProperties(); - - for ( Map.Entry kv : props.entrySet()) { - config.addDataSourceProperty(kv.getKey(), kv.getValue()); - } - + DataSourceProvider.preparePool(hdpConfig, s -> config.setConnectionInitSql(s), + entry -> config.addDataSourceProperty(entry.getKey(), entry.getValue())); return new HikariDataSource(initMetrics(config)); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreConnection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreConnection.java new file mode 100644 index 000000000000..7760d62870f2 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreConnection.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +import org.apache.hadoop.conf.Configuration; + +public record MetastoreConnection(Connection delegate, Configuration configuration) implements Connection { + @Override + public Statement createStatement() throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, delegate.createStatement(), null); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, delegate.prepareStatement(sql), sql); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, delegate.prepareCall(sql), sql); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return delegate.nativeSQL(sql); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + delegate.setAutoCommit(autoCommit); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return delegate.getAutoCommit(); + } + + @Override + public void commit() throws SQLException { + delegate.commit(); + } + + @Override + public void rollback() throws SQLException { + delegate.rollback(); + } + + @Override + public void close() throws SQLException { + delegate.close(); + } + + @Override + public boolean isClosed() throws SQLException { + return delegate.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return delegate.getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + delegate.setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return delegate.isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + delegate.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return delegate.getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + delegate.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return delegate.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return delegate.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + delegate.clearWarnings(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, + delegate.createStatement(resultSetType, resultSetConcurrency), null); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) + throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, + delegate.prepareStatement(sql, resultSetType, resultSetConcurrency), sql); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, + delegate.prepareCall(sql, resultSetType, resultSetConcurrency), sql); + } + + @Override + public Map> getTypeMap() throws SQLException { + return delegate.getTypeMap(); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + delegate.setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + delegate.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return delegate.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return delegate.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return delegate.setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + delegate.rollback(savepoint); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + delegate.releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, + delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), null); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, + delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, + delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, delegate.prepareStatement(sql, autoGeneratedKeys), sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, delegate.prepareStatement(sql, columnIndexes), sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return MetastoreStatement.getProxyStatement(configuration, delegate.prepareStatement(sql, columnNames), sql); + } + + @Override + public Clob createClob() throws SQLException { + return delegate.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return delegate.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return delegate.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return delegate.createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return delegate.isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + delegate.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + delegate.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return delegate.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return delegate.getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return delegate.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return delegate.createStruct(typeName, attributes); + } + + @Override + public void setSchema(String schema) throws SQLException { + delegate.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return delegate.getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + delegate.abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + delegate.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return delegate.getNetworkTimeout(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + if (iface.isInstance(this)) { + return iface.cast(this); + } + return delegate.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return iface.isInstance(this) || delegate.isWrapperFor(iface); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreDriver.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreDriver.java new file mode 100644 index 000000000000..f6e8ef6e6367 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreDriver.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.google.common.annotations.VisibleForTesting; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.Properties; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo; +import org.slf4j.LoggerFactory; + +import static java.sql.DriverManager.registerDriver; + +public class MetastoreDriver implements Driver { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MetastoreDriver.class); + private static final String URL_PREFIX = "jdbc:metastore://"; + private static final Configuration configuration; + private static int majorVersion = -1; + private static int minorVersion = -1; + private static volatile Driver delegateDriver; + static { + try { + registerDriver(new MetastoreDriver()); + String versionString = MetastoreVersionInfo.getVersion(); + String[] versionNums = versionString.split("\\."); + if (NumberUtils.isNumber(versionNums[0])) { + majorVersion = Integer.parseInt(versionNums[0]); + } + if (versionNums.length >1 && NumberUtils.isNumber(versionNums[1])) { + minorVersion = Integer.parseInt(versionNums[1]); + } + configuration = MetastoreConf.newMetastoreConf(); + } catch (Exception e) { + throw new RuntimeException("Failed to register Metastore driver", e); + } + } + + private synchronized static Driver + findRegisteredDriver(String jdbcUrl, String driverClassName) throws SQLException { + if (delegateDriver != null && delegateDriver.acceptsURL(jdbcUrl)) { + // Use the cached driver + return delegateDriver; + } + List candidates = new ArrayList<>(); + for (Enumeration drivers = DriverManager.getDrivers(); drivers.hasMoreElements();) { + Driver driver = drivers.nextElement(); + try { + if (driver.acceptsURL(jdbcUrl)) { + candidates.add(driver); + } + } catch (Exception e) { + } + } + + if (candidates.isEmpty()) { + Class driverClz = tryLoadDriver(driverClassName, Thread.currentThread().getContextClassLoader(), + MetastoreDriver.class.getClassLoader()); + if (driverClz != null) { + try { + Driver driver = driverClz.getDeclaredConstructor().newInstance(); + if (!driver.acceptsURL(jdbcUrl)) { + throw new RuntimeException("Driver " + driverClassName + " cannot accept jdbcUrl"); + } + candidates.add(driver); + } catch (Exception e) { + LOG.warn("Failed to create instance of driver class {}", driverClassName, e); + } + } + } + delegateDriver = candidates.isEmpty() ? DriverManager.getDriver(jdbcUrl) : candidates.getFirst(); + return delegateDriver; + } + + private static Class tryLoadDriver(String driverClassName, ClassLoader... loaders) { + for (ClassLoader loader : loaders) { + if (loader != null) { + try { + return (Class) loader.loadClass(driverClassName); + } catch (ClassNotFoundException e) { + LOG.debug("Driver class {} not found in class loader {}", driverClassName, loader); + } + } + } + return null; + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + if (!acceptsURL(url)) { + return null; + } + // The url should match jdbc:metastore://:, the is + // the real database this MetastoreDriver should connect to. + String driverAndUrl = url.substring(URL_PREFIX.length()); + String defaultDriverClz = driverAndUrl.split(":")[0]; + String jdbcUrl = driverAndUrl.substring(defaultDriverClz.length() + 1); + Connection connection; + Driver driver = delegateDriver; + if (driver == null || !driver.acceptsURL(jdbcUrl)) { + driver = findRegisteredDriver(jdbcUrl, defaultDriverClz); + } + connection = driver.connect(jdbcUrl, info); + return connection == null ? null : new MetastoreConnection(connection, configuration); + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return url != null && Pattern.matches(URL_PREFIX + ".*", url); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + // An empty array if no properties are required. + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return majorVersion; + } + + @Override + public int getMinorVersion() { + return minorVersion; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return delegateDriver.getParentLogger(); + } + + public static String getMetastoreDbUrl(Configuration configuration) { + String delegateUrl = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECT_URL_KEY); + String driverClz = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECTION_DRIVER); + return MetastoreDriver.URL_PREFIX + driverClz + ":" + delegateUrl; + } + + @VisibleForTesting + static Configuration getConfiguration() { + return configuration; + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreStatement.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreStatement.java new file mode 100644 index 000000000000..140798434f5b --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/MetastoreStatement.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.sql.Statement; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HMSHandlerContext; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.datasource.MetastoreStatement.JdbcProfilerUtils.logSlowExecution; +import static org.apache.hadoop.hive.metastore.datasource.MetastoreStatement.JdbcProfilerUtils.isSlowExecution; + +@SuppressWarnings("unchecked") +public final class MetastoreStatement implements InvocationHandler { + private static final Logger LOG = LoggerFactory.getLogger(MetastoreStatement.class); + private static final ThreadLocal CALL_CTX = new ThreadLocal<>(); + static final String EXEC_HOOK = "metastore.jdbc.execution.hook"; + + private final String rawSql; + private final Statement delegate; + private final Configuration configuration; + private final MetastoreStatementHook hook; + + private MetastoreStatement(Configuration conf, Statement statement, String rawSql) { + this.configuration = Objects.requireNonNull(conf); + this.rawSql = rawSql; + this.delegate = Objects.requireNonNull(statement); + String className = conf.get(EXEC_HOOK, ""); + if (StringUtils.isEmpty(className)) { + hook = new JdbcProfilerUtils(conf); + } else { + try { + hook = JavaUtils.newInstance(JavaUtils.getClass(className.trim(), MetastoreStatementHook.class), + new Class[] { Configuration.class}, new Object[] {conf}); + } catch (MetaException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + public static T getProxyStatement(Configuration configuration, T delegate, String rawSql) { + MetastoreStatement handler = new MetastoreStatement(configuration, delegate, rawSql); + return (T) Proxy.newProxyInstance(JavaUtils.getClassLoader(), + ClassUtils.getAllInterfaces(delegate.getClass()).toArray(new Class[0]), handler); + } + + private void logSummary(boolean monitor) { + Optional ctxCall = HMSHandlerContext.getCallCtx(); + HMSHandlerContext.CallCtx previousCall = CALL_CTX.get(); + if (ctxCall.isPresent()) { + if (previousCall == null) { + if (monitor) { + CALL_CTX.set(ctxCall.get()); + } + } else if (!ctxCall.get().equals(previousCall)) { + // we approach the end of previous thrift call + long totalSpent = previousCall.totalTime().get(); + LOG.debug("{} took {} ms to complete all queries", previousCall.methodName(), totalSpent); + if (isSlowExecution(configuration, totalSpent)) { + LOG.warn("{} took {} ms to complete all queries", previousCall.methodName(), totalSpent); + } + if (monitor) { + CALL_CTX.set(ctxCall.get()); + } else { + CALL_CTX.remove(); + } + } + } + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Timer.Context ctx = null; + try { + boolean monitor = hook.profile(rawSql, method, args); + logSummary(monitor); + if (Metrics.getRegistry() != null && monitor) { + String metricName = hook.getMetricName(method, args); + Timer timer = Metrics.getOrCreateTimer(metricName); + if (timer != null) { + ctx = timer.time(); + } + } + long start = System.currentTimeMillis(); + hook.preRun(method, args); + Object result = method.invoke(delegate, args); + hook.postRun(method, args, result); + long timeSpent = System.currentTimeMillis() - start; + if (monitor) { + String statement = rawSql != null ? rawSql : (args != null && args.length > 0 ? (String) args[0] : "no sql found"); + LOG.debug("Jdbc query: {} completed in {} ms", statement, timeSpent); + if (CALL_CTX.get() != null) { + CALL_CTX.get().totalTime().addAndGet(timeSpent); + } + } + logSlowExecution(timeSpent, configuration, rawSql, method, args); + return result; + } catch (InvocationTargetException | UndeclaredThrowableException e) { + throw e.getCause(); + } finally { + if (ctx != null) { + ctx.stop(); + } + } + } + + public interface MetastoreStatementHook { + /** + * Whether should monitor the current call, this method gives a chance to profile a specific pattern of queries. + * For example, we use {@link JdbcProfilerUtils} to profile the queries originated from a set of specific APIs. + * @param sql The sql being executed, it might be null for {@link Statement#execute}, for this case + * need to obtain the sql from args, the method input. + * @param method Method which is being called + * @param args The method input + * @return true for profiling this call, false otherwise + */ + boolean profile(String sql, Method method, Object[] args); + + String getMetricName(Method method, Object[] args); + /** + * Invoked before the method call + * @param method Method which is being called + * @param args The method input + */ + default void preRun(Method method, Object[] args) { + + } + + /** + * Invoked post the method call + * @param method Method which is being called + * @param args The method input + * @param result The execution result from the call + */ + default void postRun(Method method, Object[] args, Object result) { + + } + } + + /** + * This class is used to profile the underlying statement originated from specific thrift API calls + */ + public static class JdbcProfilerUtils implements MetastoreStatement.MetastoreStatementHook { + private static final Set PROFILED_APIS = new HashSet<>(); + static final Set QUERY_EXECUTION = + Set.of("executeQuery", "executeUpdate", "execute", "executeBatch"); + private static volatile boolean initialized = false; + private static long logSlowQueriesThreshold; + + public JdbcProfilerUtils(Configuration configuration) { + initialize(Objects.requireNonNull(configuration)); + } + + private static void initialize(Configuration configuration) { + if (!initialized) { + synchronized (JdbcProfilerUtils.class) { + if (!initialized) { + initialized = true; + logSlowQueriesThreshold = MetastoreConf.getLongVar(configuration, + MetastoreConf.ConfVars.METASTORE_JDBC_SLOW_QUERIES); + if (logSlowQueriesThreshold > 0) { + LOG.info("The slow query log enabled, will log the query that takes more than {} ms", + logSlowQueriesThreshold); + } + String thriftApis = MetastoreConf.getVar(configuration, + MetastoreConf.ConfVars.METASTORE_PROFILE_JDBC_THRIFT_APIS); + for (String thriftApi : thriftApis.split(",")) { + String trimmedThriftApi = thriftApi.trim(); + if (!trimmedThriftApi.isEmpty()) { + PROFILED_APIS.add(trimmedThriftApi); + } + } + } + } + } + } + + public static void logSlowExecution(long timeSpent, Configuration configuration, + String sql, Method method, Object[] args) { + if (isSlowExecution(configuration, timeSpent)) { + Object[] printableArgs = args; + if (args != null && args.length > 10) { + printableArgs = new Object[10]; + System.arraycopy(args, 0, printableArgs, 0, 7); + System.arraycopy(args, args.length - 2, printableArgs, 8, 2); + printableArgs[7] = "...."; + } + LOG.warn("Slow execution detected, method: {}, time taken: {} ms, args size: {}, args: {}{}", + method.getName(), timeSpent, + args != null ? args.length : 0, Arrays.toString(printableArgs), sql != null ? ", sql: " + sql : ""); + MetricRegistry registry = Metrics.getRegistry(); + if (registry != null) { + Counter slowQueries = Metrics.getOrCreateCounter(MetricsConstants.JDBC_SLOW_QUERIES); + slowQueries.inc(); + } + } + } + + public static boolean isSlowExecution(Configuration configuration, long timeSpent) { + initialize(configuration); + return logSlowQueriesThreshold > 0 && timeSpent > logSlowQueriesThreshold; + } + + @Override + public boolean profile(String sql, Method method, Object[] args) { + if (PROFILED_APIS.isEmpty() || !QUERY_EXECUTION.contains(method.getName())) { + // no api configured to profile + return false; + } + Optional ctxCall = HMSHandlerContext.getCallCtx(); + if (ctxCall.isPresent()) { + String call = ctxCall.get().methodName(); + return PROFILED_APIS.contains(call); + } + return false; + } + + @Override + public String getMetricName(Method method, Object[] args) { + return MetricsConstants.JDBC_EXECUTION + method.getName(); + } + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java index 46512ab3c4ba..cd8f36452de9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java @@ -71,6 +71,8 @@ public class MetricsConstants { public static final String DELETE_TOTAL_PARTITIONS = "delete_total_count_partitions"; public static final String DIRECTSQL_ERRORS = "directsql_errors"; + public static final String JDBC_SLOW_QUERIES = "jdbc_slow_queries"; + public static final String JDBC_EXECUTION = "jdbc_"; public static final String JVM_PAUSE_INFO = "jvm.pause.info-threshold"; public static final String JVM_PAUSE_WARN = "jvm.pause.warn-threshold"; diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestMetastoreConnection.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestMetastoreConnection.java new file mode 100644 index 000000000000..dff9c0d71a82 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestMetastoreConnection.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.datasource; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import com.zaxxer.hikari.HikariDataSource; + +import javax.sql.DataSource; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.dbcp2.PoolingDataSource; +import org.apache.derby.impl.jdbc.EmbedConnection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HMSHandlerContext; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MetastoreUnitTest.class) +public class TestMetastoreConnection { + private Configuration conf; + private Counter slowQuery; + + @Before + public void init() { + conf = MetastoreDriver.getConfiguration(); + conf.set(MetastoreStatement.EXEC_HOOK, MetastoreStatementTestHook.class.getName()); + MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.METASTORE_JDBC_SLOW_QUERIES, 200); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METASTORE_PROFILE_JDBC_EXECUTION, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_PROFILE_JDBC_THRIFT_APIS, "test_metastore_statement"); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECTION_USER_NAME, "dummyUser"); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.PWD, "dummyPass"); + conf.unset(MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE.getVarname()); + + Metrics.initialize(conf); + slowQuery = Metrics.getOrCreateCounter(MetricsConstants.JDBC_SLOW_QUERIES); + } + + @Test + public void testDefaultHikariCp() throws Exception { + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE, HikariCPDataSourceProvider.HIKARI); + + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + Assert.assertNotNull(dsp); + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof HikariDataSource); + try (Connection connection = ds.getConnection()) { + verify(connection); + } + } + + @Test + public void testDbCpDataSource() throws Exception { + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE, DbCPDataSourceProvider.DBCP); + + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + Assert.assertNotNull(dsp); + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof PoolingDataSource); + try (Connection connection = ds.getConnection()) { + verify(connection); + } + } + + private void verify(Connection connection) throws Exception { + Assert.assertTrue(connection.unwrap(MetastoreConnection.class).delegate() instanceof EmbedConnection); + long slowNum = slowQuery.getCount(); + Timer timer = Metrics.getOrCreateTimer(MetastoreStatementTestHook.TEST_METRIC_NAME); + Assert.assertNotNull(timer); + long timeCount = timer.getCount(); + try (AutoCloseable sleep = MetastoreStatementTestHook.testConnection("test_metastore_statement", 300)) { + try (Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery("VALUES 1")) { + Assert.assertTrue(rs.next()); + } + } + Assert.assertEquals(slowNum + 1, slowQuery.getCount()); + Assert.assertEquals(timeCount + 1, timer.getCount()); + Assert.assertTrue(timer.getSnapshot().getMean() > TimeUnit.MILLISECONDS.toNanos(300)); + + // Test a method outside of monitor + try (AutoCloseable sleep = MetastoreStatementTestHook.testConnection("test_statement_outside", 300)) { + try (Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery("VALUES 1")) { + Assert.assertTrue(rs.next()); + } + } + // record the slow query though + Assert.assertEquals(slowNum + 2, slowQuery.getCount()); + // don't count this un-interested method + Assert.assertEquals(timeCount + 1, timer.getCount()); + } + + public static class MetastoreStatementTestHook extends MetastoreStatement.JdbcProfilerUtils { + static final String TEST_METRIC_NAME = "MetastoreStatementTestHook_" + System.currentTimeMillis(); + static final String ENABLE_SLEEP_FOR_QUERY = "MetastoreStatementTestHook.should.sleep"; + static final String SLEEP_MILLIS = "MetastoreStatementTestHook.sleep.ms"; + private final boolean shouldSleep; + private final long sleepMs; + public MetastoreStatementTestHook(Configuration configuration) { + super(configuration); + shouldSleep = configuration.getBoolean(ENABLE_SLEEP_FOR_QUERY, false); + sleepMs = configuration.getLong(SLEEP_MILLIS, 1000); + } + + @Override + public void preRun(Method method, Object[] args) { + if (shouldSleep && + MetastoreStatement.JdbcProfilerUtils.QUERY_EXECUTION.contains(method.getName())) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public String getMetricName(Method method, Object[] args) { + return TEST_METRIC_NAME; + } + + public static AutoCloseable testConnection(String method, long sleepMs) { + Configuration configuration = MetastoreDriver.getConfiguration(); + configuration.setLong(SLEEP_MILLIS, sleepMs); + configuration.setBoolean(ENABLE_SLEEP_FOR_QUERY, true); + HMSHandlerContext + .setCallCtx(new HMSHandlerContext.CallCtx(method, System.currentTimeMillis(), new AtomicLong())); + return () -> { + configuration.unset(ENABLE_SLEEP_FOR_QUERY); + configuration.unset(SLEEP_MILLIS); + HMSHandlerContext.setCallCtx(null); + }; + } + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java index 32b84511b38a..bfa78ecfa2c4 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java @@ -51,7 +51,7 @@ public void testBadConnection() throws Exception { } assertNotNull(e); assertTrue( - e.getMessage().contains("No suitable driver found for blah") + e.getMessage().contains("No suitable driver") || e.getMessage().contains("Failed to get driver instance for jdbcUrl=blah") ); }