Pages

Creating index in Hive


Simple:
CREATE INDEX idx ON TABLE tbl(col_name) AS 'Index_Handler_QClass_Name' IN TABLE tbl_idx;
As to make pluggable indexing algorithms, one has to mention the associated class name that handles indexing say for eg:-org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler
The index handler classes implement HiveIndexHandler
Full Syntax:
CREATE INDEX index_name
ON TABLE base_table_name (col_name, ...)
AS 'index.handler.class.name'
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name=property_value, ...)]
[IN TABLE index_table_name]
[PARTITIONED BY (col_name, ...)]
[
   [ ROW FORMAT ...] STORED AS ...
   | STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]
[COMMENT "index comment"]
  • WITH DEFERRED REBUILD - for newly created index is initially empty. REBUILD can be used to make the index up to date.
  • IDXPROPERTIES/TBLPROPERTIES - declaring keyspace properties
  • PARTITIONED BY - table columns where in the index get partitioned, if not specified index spans all table partitions
  • ROW FORMAT  - custom SerDe or using native SerDe(Serializer/Deserializer for Hive read/write). A native SerDe is used if ROW FORMAT is not specified
  • STORED AS  - index table storage format like RCFILE or SEQUENCFILE.The user has to uniquely specify tabl_idx name is required for a qualified index name across tables, otherwise they are named automatically. STORED BY - can be HBase (I haven't tried it)

The index can be stored in hive table or as RCFILE in an hdfs path etc. In this case, the implemented  index handler class usesIndexTable() method will return false.When index is created, the generateIndexBuildTaskList(...) in index handler class will generate a plan for building the index.

Consider CompactIndexHandler from Hive distribution,

It  only stores the addresses of HDFS blocks containing that value. The index is stored in hive metastore FieldSchema as _bucketname and _offsets in the index table.

ie the index table contains 3 columns, with _unparsed_column_names_from_field schema (indexed columns), _bucketname(table partition hdfs file having columns),[" _blockoffsets",..."]



See the code from CompactIndexHandler,
   
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.index.compact;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.index.HiveIndexQueryContext;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
public class CompactIndexHandler extends TableBasedIndexHandler {
private Configuration configuration;
// The names of the partition columns
private Set<String> partitionCols;
// Whether or not the conditions have been met to use the fact the index is sorted
private boolean useSorted;
private static final Log LOG = LogFactory.getLog(CompactIndexHandler.class.getName());
@Override
public void analyzeIndexDefinition(Table baseTable, Index index,
Table indexTable) throws HiveException {
StorageDescriptor storageDesc = index.getSd();
if (this.usesIndexTable() && indexTable != null) {
StorageDescriptor indexTableSd = storageDesc.deepCopy();
List<FieldSchema> indexTblCols = indexTableSd.getCols();
FieldSchema bucketFileName = new FieldSchema("_bucketname", "string", "");
indexTblCols.add(bucketFileName);
FieldSchema offSets = new FieldSchema("_offsets", "array<bigint>", "");
indexTblCols.add(offSets);
indexTable.setSd(indexTableSd);
}
}
@Override
protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
List<FieldSchema> indexField, boolean partitioned,
PartitionDesc indexTblPartDesc, String indexTableName,
PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
//form a new insert overwrite query.
StringBuilder command= new StringBuilder();
LinkedHashMap<String, String> partSpec = indexTblPartDesc.getPartSpec();
command.append("INSERT OVERWRITE TABLE " + HiveUtils.unparseIdentifier(indexTableName ));
if (partitioned && indexTblPartDesc != null) {
command.append(" PARTITION ( ");
List<String> ret = getPartKVPairStringArray(partSpec);
for (int i = 0; i < ret.size(); i++) {
String partKV = ret.get(i);
command.append(partKV);
if (i < ret.size() - 1) {
command.append(",");
}
}
command.append(" ) ");
}
command.append(" SELECT ");
command.append(indexCols);
command.append(",");
command.append(VirtualColumn.FILENAME.getName());
command.append(",");
command.append(" collect_set (");
command.append(VirtualColumn.BLOCKOFFSET.getName());
command.append(") ");
command.append(" FROM " + HiveUtils.unparseIdentifier(baseTableName) );
LinkedHashMap<String, String> basePartSpec = baseTablePartDesc.getPartSpec();
if(basePartSpec != null) {
command.append(" WHERE ");
List<String> pkv = getPartKVPairStringArray(basePartSpec);
for (int i = 0; i < pkv.size(); i++) {
String partKV = pkv.get(i);
command.append(partKV);
if (i < pkv.size() - 1) {
command.append(" AND ");
}
}
}
command.append(" GROUP BY ");
command.append(indexCols + ", " + VirtualColumn.FILENAME.getName());
HiveConf builderConf = new HiveConf(getConf(), CompactIndexHandler.class);
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
command, partSpec, indexTableName, dbName);
return rootTask;
}
@Override
public void generateIndexQuery(List<Index> indexes, ExprNodeDesc predicate,
ParseContext pctx, HiveIndexQueryContext queryContext) {
Index index = indexes.get(0);
DecomposedPredicate decomposedPredicate = decomposePredicate(predicate, index,
queryContext.getQueryPartitions());
if (decomposedPredicate == null) {
queryContext.setQueryTasks(null);
return; // abort if we couldn't pull out anything from the predicate
}
// pass residual predicate back out for further processing
queryContext.setResidualPredicate(decomposedPredicate.residualPredicate);
// setup TableScanOperator to change input format for original query
queryContext.setIndexInputFormat(HiveCompactIndexInputFormat.class.getName());
// Build reentrant QL for index query
StringBuilder qlCommand = new StringBuilder("INSERT OVERWRITE DIRECTORY ");
String tmpFile = pctx.getContext().getMRTmpFileURI();
queryContext.setIndexIntermediateFile(tmpFile);
qlCommand.append( "\"" + tmpFile + "\" "); // QL includes " around file name
qlCommand.append("SELECT `_bucketname` , `_offsets` FROM ");
qlCommand.append(HiveUtils.unparseIdentifier(index.getIndexTableName()));
qlCommand.append(" WHERE ");
String predicateString = decomposedPredicate.pushedPredicate.getExprString();
qlCommand.append(predicateString);
// generate tasks from index query string
LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString());
HiveConf queryConf = new HiveConf(pctx.getConf(), CompactIndexHandler.class);
HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false);
Driver driver = new Driver(queryConf);
driver.compile(qlCommand.toString(), false);
if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
// For now, only works if the predicate is a single condition
MapredWork work = null;
String originalInputFormat = null;
for (Task task : driver.getPlan().getRootTasks()) {
// The index query should have one and only one map reduce task in the root tasks
// Otherwise something is wrong, log the problem and continue using the default format
if (task.getWork() instanceof MapredWork) {
if (work != null) {
LOG.error("Tried to use a binary search on a compact index but there were an " +
"unexpected number (>1) of root level map reduce tasks in the " +
"reentrant query plan.");
work.setInputformat(null);
work.setInputFormatSorted(false);
break;
}
work = (MapredWork)task.getWork();
String inputFormat = work.getInputformat();
originalInputFormat = inputFormat;
if (inputFormat == null) {
inputFormat = HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVEINPUTFORMAT);
}
// We can only perform a binary search with HiveInputFormat and CombineHiveInputFormat
// and BucketizedHiveInputFormat
try {
if (!HiveInputFormat.class.isAssignableFrom(Class.forName(inputFormat))) {
work = null;
break;
}
} catch (ClassNotFoundException e) {
LOG.error("Map reduce work's input format class: " + inputFormat + " was not found. " +
"Cannot use the fact the compact index is sorted.");
work = null;
break;
}
work.setInputFormatSorted(true);
}
}
if (work != null) {
// Find the filter operator and expr node which act on the index column and mark them
if (!findIndexColumnFilter(work.getAliasToWork().values())) {
LOG.error("Could not locate the index column's filter operator and expr node. Cannot " +
"use the fact the compact index is sorted.");
work.setInputformat(originalInputFormat);
work.setInputFormatSorted(false);
}
}
}
queryContext.addAdditionalSemanticInputs(driver.getPlan().getInputs());
queryContext.setQueryTasks(driver.getPlan().getRootTasks());
return;
}
/**
* Does a depth first search on the operator tree looking for a filter operator whose predicate
* has one child which is a column which is not in the partition
* @param operators
* @return whether or not it has found its target
*/
private boolean findIndexColumnFilter(Collection<Operator<? extends Serializable>> operators) {
for (Operator<? extends Serializable> op : operators) {
if (op instanceof FilterOperator && ((FilterOperator)op).getConf().getPredicate().getChildren() != null) {
// Is this the target
if (findIndexColumnExprNodeDesc(((FilterOperator)op).getConf().getPredicate())) {
((FilterOperator)op).getConf().setSortedFilter(true);
return true;
}
}
// If the target has been found, no need to continue
if (findIndexColumnFilter(op.getChildOperators())) {
return true;
}
}
return false;
}
private boolean findIndexColumnExprNodeDesc(ExprNodeDesc expression) {
if (expression.getChildren() == null) {
return false;
}
if (expression.getChildren().size() == 2) {
ExprNodeColumnDesc columnDesc = null;
if (expression.getChildren().get(0) instanceof ExprNodeColumnDesc) {
columnDesc = (ExprNodeColumnDesc)expression.getChildren().get(0);
} else if (expression.getChildren().get(1) instanceof ExprNodeColumnDesc) {
columnDesc = (ExprNodeColumnDesc)expression.getChildren().get(1);
}
// Is this the target
if (columnDesc != null && !partitionCols.contains(columnDesc.getColumn())) {
assert expression instanceof ExprNodeGenericFuncDesc :
"Expression containing index column is does not support sorting, should not try" +
"and sort";
((ExprNodeGenericFuncDesc)expression).setSortedExpr(true);
return true;
}
}
for (ExprNodeDesc child : expression.getChildren()) {
// If the target has been found, no need to continue
if (findIndexColumnExprNodeDesc(child)) {
return true;
}
}
return false;
}
/**
* Split the predicate into the piece we can deal with (pushed), and the one we can't (residual)
* @param predicate
* @param index
* @return
*/
private DecomposedPredicate decomposePredicate(ExprNodeDesc predicate, Index index,
Set<Partition> queryPartitions) {
IndexPredicateAnalyzer analyzer = getIndexPredicateAnalyzer(index, queryPartitions);
List<IndexSearchCondition> searchConditions = new ArrayList<IndexSearchCondition>();
// split predicate into pushed (what we can handle), and residual (what we can't handle)
ExprNodeDesc residualPredicate = analyzer.analyzePredicate(predicate, searchConditions);
if (searchConditions.size() == 0) {
return null;
}
int numIndexCols = 0;
for (IndexSearchCondition searchCondition : searchConditions) {
if (!partitionCols.contains(searchCondition.getColumnDesc().getColumn())) {
numIndexCols++;
}
}
// For now, only works if the predicate has a single condition on an index column
if (numIndexCols == 1) {
useSorted = true;
} else {
useSorted = false;
}
DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions);
decomposedPredicate.residualPredicate = residualPredicate;
return decomposedPredicate;
}
/**
* Instantiate a new predicate analyzer suitable for determining
* whether we can use an index, based on rules for indexes in
* WHERE clauses that we support
*
* @return preconfigured predicate analyzer for WHERE queries
*/
private IndexPredicateAnalyzer getIndexPredicateAnalyzer(Index index, Set<Partition> queryPartitions) {
IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
analyzer.addComparisonOp(GenericUDFOPEqual.class.getName());
analyzer.addComparisonOp(GenericUDFOPLessThan.class.getName());
analyzer.addComparisonOp(GenericUDFOPEqualOrLessThan.class.getName());
analyzer.addComparisonOp(GenericUDFOPGreaterThan.class.getName());
analyzer.addComparisonOp(GenericUDFOPEqualOrGreaterThan.class.getName());
// only return results for columns in this index
List<FieldSchema> columnSchemas = index.getSd().getCols();
for (FieldSchema column : columnSchemas) {
analyzer.allowColumnName(column.getName());
}
// partitioned columns are treated as if they have indexes so that the partitions
// are used during the index query generation
partitionCols = new HashSet<String>();
for (Partition part : queryPartitions) {
if (part.getSpec().isEmpty()) {
continue; // empty partitions are from whole tables, so we don't want to add them in
}
for (String column : part.getSpec().keySet()) {
analyzer.allowColumnName(column);
partitionCols.add(column);
}
}
return analyzer;
}
@Override
public boolean checkQuerySize(long querySize, HiveConf hiveConf) {
long minSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER_COMPACT_MINSIZE);
long maxSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER_COMPACT_MAXSIZE);
if (maxSize < 0) {
maxSize = Long.MAX_VALUE;
}
return (querySize > minSize & querySize < maxSize);
}
@Override
public boolean usesIndexTable() {
return true;
}
}

13 comments:

  1. It was really a nice article and i was really impressed by reading this Hadoop Admin Online Course Bangalore

    ReplyDelete
  2. Simply want to say your article is as surprising. The clarity on your put up is simply spectacular and that I could assume you are a professional on this subject. Fine with your permission let me to take hold of your RSS feed to keep up to date with forthcoming post. Thanks a million and please carry on the enjoyable work.
    야한동영상
    휴게텔
    외국인출장
    마사지

    ReplyDelete
  3. This comment has been removed by the author.

    ReplyDelete
  4. Good explanation. Are you looking for best digital marketing services? No look further Avenir Digital Stories is the best digital marketing consultant in India which provide SEO,SMO, web design, corporate stationary, ads banner design and google ads services at an affordable price. Contact us for


    best social media agency in Gurgaon, content marketing agency Gurgaon,
    best digital marketing services in Delhi

    ReplyDelete
  5. Elevate your online presence with powerful dedicated server frankfurt.. Explore our Frankfurt options for global reach. Experience unmatched performance and reliability for your business needs.

    ReplyDelete
  6. Escape to a luxurious best resort in jaipur, where royal elegance meets modern comfort. Enjoy world-class amenities, serene landscapes, and unforgettable experiences.

    ReplyDelete
  7. Are you looking to start forex trading without risking your own capital? With a Free Funded Forex Account with WinproFX, you can access a fully funded trading account and trade in real market conditions. This opportunity allows traders to showcase their skills and earn profits without any initial investment. WinproFX provides a reliable platform with competitive spreads, advanced trading tools, and a seamless user experience, making it an ideal choice for both beginners and experienced traders. By joining this funded account program, you can trade with confidence, knowing that you are backed by a professional trading environment designed to help you succeed.

    The Free Funded Forex Account with WinproFX eliminates the barriers to entry for aspiring traders. Instead of risking personal funds, traders get the chance to demonstrate their trading abilities and earn real profits. This program is perfect for those who have the knowledge and strategy but lack the capital to start trading. WinproFX ensures fair and transparent trading conditions, allowing traders to focus on making profitable decisions. Whether you want to trade forex, commodities, or indices, this funded account provides a fantastic opportunity to enter the financial markets with zero risk. Don’t miss out on this incredible chance to trade like a pro—sign up for your free funded account with WinproFX today!

    Free Funded Forex Account-

    ReplyDelete