Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,13 @@ public enum ConfVars {
"hive.txn.acid.metrics.delta.pct.threshold", 0.01f,
"Percentage (fractional) size of the delta files relative to the base directory. Deltas smaller than this threshold " +
"count as small deltas. Default 0.01 = 1%.)"),
METASTORE_JDBC_SLOW_QUERIES("metastore.jdbc.execution.logSlowQueriesThreshold", "metastore.jdbc.execution.logSlowQueriesThreshold",
3000, "Log the slow jdbc query that Metastore has been waiting for the result beyond the threshold(ms), " +
"should enable the metastore.profile.jdbc.execution first"),
METASTORE_PROFILE_JDBC_EXECUTION("metastore.profile.jdbc.execution", "metastore.profile.jdbc.execution", true,
Comment thread
dengzhhu653 marked this conversation as resolved.
"Profile the jdbc executions, will give the histogram about the jdbc read and write if check the metrics"),
Comment on lines +652 to +655
METASTORE_PROFILE_JDBC_THRIFT_APIS("metastore.jdbc.profile.thrift.apis", "metastore.jdbc.profile.thrift.apis",
"get_table_req,get_database_req", "List of thrift methods that want to monitor the underlying jdbc executions"),
COMPACTOR_INITIATOR_ON("metastore.compactor.initiator.on", "hive.compactor.initiator.on", true,
"Whether to run the initiator thread on this metastore instance or not.\n" +
"Set this to true on one instance of the Thrift metastore service as part of turning\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.handler.BaseHandler;
Expand Down Expand Up @@ -55,6 +57,24 @@ public final class HMSHandlerContext {

private Map<String, com.codahale.metrics.Timer.Context> timerContexts = new HashMap<>();

// Unique ID of current thrift call
private CallCtx callCtx;

public record CallCtx(String methodName, long startTime, AtomicLong totalTime) {
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass())
return false;
CallCtx callCtx = (CallCtx) o;
return startTime == callCtx.startTime && Objects.equals(methodName, callCtx.methodName);
}

@Override
public int hashCode() {
return Objects.hash(methodName, startTime);
}
}

private HMSHandlerContext() {
}

Expand Down Expand Up @@ -89,6 +109,14 @@ public static Map<String, com.codahale.metrics.Timer.Context> getTimerContexts()
return context.get().timerContexts;
}

public static Optional<CallCtx> getCallCtx() {
return Optional.ofNullable(context.get().callCtx);
}

public static void setCallCtx(CallCtx ctx) {
context.get().callCtx = ctx;
}

public static void setRawStore(RawStore rawStore) {
context.get().rawStore = rawStore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.jdo.JDOException;

Expand All @@ -44,10 +46,12 @@ public class RetryingHMSHandler extends AbstractHMSHandlerProxy {

private final long retryInterval;
private final int retryLimit;
private final boolean local;

public RetryingHMSHandler(Configuration conf, IHMSHandler baseHandler, boolean local)
throws MetaException {
super(conf, baseHandler, local);
this.local = local;
retryInterval = MetastoreConf.getTimeVar(origConf,
ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS);
retryLimit = MetastoreConf.getIntVar(origConf, ConfVars.HMS_HANDLER_ATTEMPTS);
Expand Down Expand Up @@ -87,9 +91,22 @@ public Result invokeInternal(final Object proxy, final Method method, final Obje
}
Object object = null;
boolean isStarted = Deadline.startTimer(method.getName());
boolean clearLocal = false;
try {
if (!local) {
Optional<HMSHandlerContext.CallCtx> previousCall = HMSHandlerContext.getCallCtx();
if (previousCall.isEmpty()) {
HMSHandlerContext.CallCtx currentCall =
new HMSHandlerContext.CallCtx(method.getName(), System.currentTimeMillis(), new AtomicLong());
HMSHandlerContext.setCallCtx(currentCall);
clearLocal = true;
}
}
object = method.invoke(baseHandler, args);
} finally {
if (clearLocal) {
HMSHandlerContext.setCallCtx(null);
}
Comment on lines +96 to +109
if (isStarted) {
Deadline.stopTimer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import java.io.Closeable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import javax.sql.DataSource;

import com.google.common.collect.Iterables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;

public interface DataSourceProvider {
Expand Down Expand Up @@ -77,13 +80,28 @@ static String getMetastoreJdbcPasswd(Configuration conf) throws SQLException {
}

static String getMetastoreJdbcDriverUrl(Configuration conf) throws SQLException {
if (MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_PROFILE_JDBC_EXECUTION)) {
return MetastoreDriver.getMetastoreDbUrl(conf);
}
return MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY);
}

static String getDataSourceName(Configuration conf) {
return conf.get(DataSourceNameConfigurator.DATA_SOURCE_NAME);
}

static void preparePool(Configuration configuration, Consumer<String> initSql,
Consumer<Map.Entry<String, String>> dataSourceProps) {
String url = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CONNECT_URL_KEY);
DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(url, configuration);
String s = dbProduct.getPrepareTxnStmt();
if (s != null) {
initSql.accept(s);
}
Map<String, String> properties = dbProduct.getDataSourceProperties();
properties.entrySet().forEach(dataSourceProps);
}

class DataSourceNameConfigurator implements Closeable {
static final String DATA_SOURCE_NAME = "metastore.DataSourceProvider.pool.name";
private final Configuration configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

import javax.sql.DataSource;

Expand All @@ -32,7 +31,6 @@
import org.apache.commons.pool2.impl.BaseObjectPoolConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,12 +71,6 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc
dbcpDs.setDefaultReadOnly(false);
dbcpDs.setDefaultAutoCommit(true);

DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(driverUrl, hdpConfig);
Map<String, String> props = dbProduct.getDataSourceProperties();
for (Map.Entry<String, String> kv : props.entrySet()) {
dbcpDs.setConnectionProperties(kv.getKey() + "=" + kv.getValue());
}

long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 30000L);
int connectionMaxIlde = hdpConfig.getInt(CONNECTION_MAX_IDLE_PROPERTY, 8);
int connectionMinIlde = hdpConfig.getInt(CONNECTION_MIN_IDLE_PROPERTY, 0);
Expand Down Expand Up @@ -127,10 +119,16 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc
objectPool.setSoftMinEvictableIdleDuration(Duration.ofMillis(600 * 1000));
}
}
String stmt = dbProduct.getPrepareTxnStmt();
if (stmt != null) {
poolableConnFactory.setConnectionInitSql(Collections.singletonList(stmt));
}
StringBuilder connectionProperties = new StringBuilder();
DataSourceProvider.preparePool(hdpConfig,
stmt -> poolableConnFactory.setConnectionInitSql(Collections.singletonList(stmt)),
kv -> {
if (connectionProperties.length() > 0) {
connectionProperties.append(';');
}
connectionProperties.append(kv.getKey()).append('=').append(kv.getValue());
dbcpDs.setConnectionProperties(connectionProperties.toString());
});
Comment thread
dengzhhu653 marked this conversation as resolved.
return new PoolingDataSource(objectPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

/**
Expand Down Expand Up @@ -77,19 +75,8 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc
config.setMinimumIdle(Math.min(maxPoolSize, minimumIdle));
}

DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(driverUrl, hdpConfig);

String s = dbProduct.getPrepareTxnStmt();
if (s!= null) {
config.setConnectionInitSql(s);
}

Map<String, String> props = dbProduct.getDataSourceProperties();

for ( Map.Entry<String, String> kv : props.entrySet()) {
config.addDataSourceProperty(kv.getKey(), kv.getValue());
}

DataSourceProvider.preparePool(hdpConfig, s -> config.setConnectionInitSql(s),
entry -> config.addDataSourceProperty(entry.getKey(), entry.getValue()));
return new HikariDataSource(initMetrics(config));
}

Expand Down
Loading
Loading