diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index c051dc8383..e616e7c3bb 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -32,6 +32,7 @@ enum TCatalogObjectType { TABLE, VIEW, FUNCTION, + GLOBAL_INDEX, DATA_SOURCE, ROLE, PRIVILEGE, @@ -265,6 +266,41 @@ struct TDataSourceTable { 2: required string init_string } +// Represents a point +struct TPoint { + 1: required double x + 2: required double y +} + +// Represents a rectangle used in a global index record for spatial tables. +struct TRectangle { + 1: required double x1 + 2: required double y1 + 3: required double x2 + 4: required double y2 +} + +// Represents a global index record for spatial tables. +struct TGlobalIndexRecord { + // Id of the record. + 1: required i32 id + + // Tag of the record. + 2: required string tag + + // MBR of the record. + 3: required TRectangle mbr +} + +// Represents a global index used for spatial tables. +struct TGlobalIndex { + // Name of the table that this global index belongs to. + 1: required string tbl_name + + // Map of global indexes' records. + 2: required map globalIndexMap +} + // Represents a table or view. struct TTable { // Name of the parent database. Case insensitive, expected to be stored as lowercase. @@ -309,6 +345,9 @@ struct TTable { // Set iff this is a table from an external data source 13: optional TDataSourceTable data_source_table + + // Set iff this table is spatial with initialized global index. + 14: optional TGlobalIndex globalIndex; } // Represents a database. diff --git a/fe/src/main/cup/sql-parser.y b/fe/src/main/cup/sql-parser.y index 22051f7089..1bd79527f9 100644 --- a/fe/src/main/cup/sql-parser.y +++ b/fe/src/main/cup/sql-parser.y @@ -14,6 +14,11 @@ package com.cloudera.impala.analysis; +import org.gistic.spatialImpala.catalog.Point; +import org.gistic.spatialImpala.catalog.Rectangle; +import org.gistic.spatialImpala.analysis.SpatialPointInclusionStmt; +import org.gistic.spatialImpala.analysis.SpatialKnnStmt; + import com.cloudera.impala.catalog.Type; import com.cloudera.impala.catalog.ScalarType; import com.cloudera.impala.catalog.ArrayType; @@ -239,11 +244,11 @@ terminal KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HAVING, KW_IF, KW_IN, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, - KW_INVALIDATE, KW_IS, KW_JOIN, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, + KW_INVALIDATE, KW_IS, KW_JOIN, KW_KNN, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, - KW_ON, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, - KW_PARTITION, KW_PARTITIONED, KW_PARTITIONS, KW_PRECEDING, - KW_PREPARE_FN, KW_PRODUCED, KW_RANGE, KW_RCFILE, KW_REFRESH, KW_REGEXP, KW_RENAME, + KW_ON, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, KW_OVERLAPS, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, + KW_PARTITION, KW_PARTITIONED, KW_PARTITIONS, KW_POINT, KW_POINTS, KW_PRECEDING, + KW_PREPARE_FN, KW_PRODUCED, KW_RANGE, KW_RCFILE, KW_RECTANGLE, KW_REFRESH, KW_REGEXP, KW_RENAME, KW_REPLACE, KW_RETURNS, KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, KW_SERDEPROPERTIES, KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT, KW_STORED, KW_STRAIGHT_JOIN, @@ -251,7 +256,7 @@ terminal KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UPDATE_FN, KW_USE, KW_USING, - KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH; + KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH, KW_WITHDIST, KW_WITHK; terminal COLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT; @@ -438,6 +443,14 @@ nonterminal TFunctionCategory opt_function_category; nonterminal HashMap create_function_args_map; nonterminal CreateFunctionStmtBase.OptArg create_function_arg_key; +// For Spatial related queries +nonterminal SpatialPointInclusionStmt spatial_point_inclusion_stmt; +nonterminal Rectangle rectangle; +nonterminal NumericLiteral rectangle_arg; +nonterminal SpatialKnnStmt spatial_knn_stmt; +nonterminal Point point; +nonterminal NumericLiteral point_arg; + precedence left KW_OR; precedence left KW_AND; precedence left KW_NOT, NOT; @@ -549,6 +562,46 @@ stmt ::= {: RESULT = grant_privilege; :} | revoke_privilege_stmt:revoke_privilege {: RESULT = revoke_privilege; :} + | spatial_point_inclusion_stmt:spatial_point_inclusion + {: RESULT = spatial_point_inclusion; :} + | spatial_knn_stmt:spatial_knn + {: RESULT = spatial_knn; :} + ; + +spatial_point_inclusion_stmt ::= + KW_LOAD KW_POINTS KW_FROM KW_TABLE table_name:tbl KW_OVERLAPS rectangle:rect + {: RESULT = new SpatialPointInclusionStmt(tbl, rect); :} + ; + +spatial_knn_stmt ::= + KW_LOAD KW_POINTS KW_FROM KW_TABLE table_name:tbl KW_KNN point:p KW_WITHK expr:k KW_WITHDIST order_by_elements:dist + {: RESULT = new SpatialKnnStmt(tbl,p,new LimitElement(k, null), dist); :} + ; + +rectangle ::= + KW_RECTANGLE LPAREN rectangle_arg:x1 COMMA rectangle_arg:y1 COMMA + rectangle_arg:x2 COMMA rectangle_arg:y2 RPAREN + {: RESULT = new Rectangle(x1.getDoubleValue(), y1.getDoubleValue(), + x2.getDoubleValue(), y2.getDoubleValue()); :} + ; + +point ::= + KW_POINT LPAREN point_arg:x COMMA point_arg:y RPAREN + {: RESULT = new Point(x.getDoubleValue(), y.getDoubleValue()); :} + ; + +rectangle_arg ::= + INTEGER_LITERAL:l + {: RESULT = new NumericLiteral(l); :} + | DECIMAL_LITERAL:l + {: RESULT = new NumericLiteral(l); :} + ; + +point_arg ::= + INTEGER_LITERAL:l + {: RESULT = new NumericLiteral(l); :} + | DECIMAL_LITERAL:l + {: RESULT = new NumericLiteral(l); :} ; load_stmt ::= diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java index 25cada1d78..a4103ab6f8 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java @@ -27,6 +27,8 @@ import com.cloudera.impala.thrift.TQueryCtx; import com.google.common.base.Preconditions; +import org.gistic.spatialImpala.analysis.*; + /** * Wrapper class for parser and analyzer. */ @@ -54,7 +56,11 @@ static public class AnalysisResult { public boolean isAlterTableStmt() { return stmt_ instanceof AlterTableStmt; } public boolean isAlterViewStmt() { return stmt_ instanceof AlterViewStmt; } public boolean isComputeStatsStmt() { return stmt_ instanceof ComputeStatsStmt; } - public boolean isQueryStmt() { return stmt_ instanceof QueryStmt; } + public boolean isQueryStmt() { + return (stmt_ instanceof QueryStmt + || stmt_ instanceof SpatialPointInclusionStmt + || stmt_ instanceof SpatialKnnStmt); + } public boolean isInsertStmt() { return stmt_ instanceof InsertStmt; } public boolean isDropDbStmt() { return stmt_ instanceof DropDbStmt; } public boolean isDropTableOrViewStmt() { @@ -203,7 +209,14 @@ public LoadDataStmt getLoadDataStmt() { public QueryStmt getQueryStmt() { Preconditions.checkState(isQueryStmt()); - return (QueryStmt) stmt_; + if (stmt_ instanceof QueryStmt) + return (QueryStmt) stmt_; + else if (stmt_ instanceof SpatialPointInclusionStmt) + return ((SpatialPointInclusionStmt) stmt_).getSelectStmtIfAny(); + else if (stmt_ instanceof SpatialKnnStmt) + return ((SpatialKnnStmt) stmt_).getSelectStmtIfAny(); + else + return null; } public InsertStmt getInsertStmt() { diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java index 9c347dc98e..31f0c36345 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java @@ -37,6 +37,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.gistic.spatialImpala.catalog.GlobalIndex; + /** * Represents a CREATE TABLE statement. */ @@ -205,7 +207,15 @@ public void analyze(Analyzer analyzer) throws AnalysisException { } if (location_ != null) location_.analyze(analyzer, Privilege.ALL); - + + // If the table is Spatial, ensure that the global index path exists + // and is valid. + if (tblProperties_ != null) { + String globalIndexPathIfAny = tblProperties_.get(GlobalIndex.GLOBAL_INDEX_TABLE_PARAM); + if (globalIndexPathIfAny != null) + (new HdfsUri(globalIndexPathIfAny)).analyze(analyzer, Privilege.ALL); + } + analyzeRowFormatValue(rowFormat_.getFieldDelimiter()); analyzeRowFormatValue(rowFormat_.getLineDelimiter()); analyzeRowFormatValue(rowFormat_.getEscapeChar()); diff --git a/fe/src/main/java/com/cloudera/impala/analysis/LimitElement.java b/fe/src/main/java/com/cloudera/impala/analysis/LimitElement.java index f2f1ef3c57..f7e211b867 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/LimitElement.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/LimitElement.java @@ -23,7 +23,7 @@ /** * Combination of limit and offset expressions. */ -class LimitElement { +public class LimitElement { private final Expr limitExpr_; private final Expr offsetExpr_; private long limit_; diff --git a/fe/src/main/java/com/cloudera/impala/analysis/SelectListItem.java b/fe/src/main/java/com/cloudera/impala/analysis/SelectListItem.java index 03b6cd2505..5398be2303 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/SelectListItem.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/SelectListItem.java @@ -16,7 +16,7 @@ import com.google.common.base.Preconditions; -class SelectListItem { +public class SelectListItem { private final Expr expr_; private String alias_; diff --git a/fe/src/main/java/com/cloudera/impala/analysis/SelectStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/SelectStmt.java index 9f5e5c50fb..9b27c8c906 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/SelectStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/SelectStmt.java @@ -65,7 +65,7 @@ public class SelectStmt extends QueryStmt { // directly private ExprSubstitutionMap baseTblSmap_ = new ExprSubstitutionMap(); - SelectStmt(SelectList selectList, + public SelectStmt(SelectList selectList, List tableRefList, Expr wherePredicate, ArrayList groupingExprs, Expr havingPredicate, ArrayList orderByElements, diff --git a/fe/src/main/java/com/cloudera/impala/analysis/StatementBase.java b/fe/src/main/java/com/cloudera/impala/analysis/StatementBase.java index 86effef23f..4c2732e06e 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/StatementBase.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/StatementBase.java @@ -19,7 +19,7 @@ /** * Base class for all Impala SQL statements. */ -abstract class StatementBase implements ParseNode { +public abstract class StatementBase implements ParseNode { // True if this Stmt is the top level of an explain stmt. protected boolean isExplain_ = false; diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Table.java b/fe/src/main/java/com/cloudera/impala/catalog/Table.java index cda21d8234..cf34842e3f 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Table.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Table.java @@ -38,6 +38,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.gistic.spatialImpala.catalog.SpatialHdfsTable; + /** * Base class for table metadata. * @@ -108,7 +110,7 @@ protected Table(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, */ public abstract void load(Table oldValue, HiveMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException; - + public void addColumn(Column col) { colsByPos_.add(col); colsByName_.put(col.getName().toLowerCase(), col); @@ -195,7 +197,11 @@ public static Table fromMetastoreTable(TableId id, Db db, // data source. table = new DataSourceTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } else if (HdfsFileFormat.isHdfsFormatClass(msTbl.getSd().getInputFormat())) { - table = new HdfsTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); + // Checking if the table is Spatial or not. + if (SpatialHdfsTable.isSpatial(msTbl)) + table = new SpatialHdfsTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); + else + table = new HdfsTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } return table; } diff --git a/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialKnnStmt.java b/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialKnnStmt.java new file mode 100644 index 0000000000..781bc19408 --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialKnnStmt.java @@ -0,0 +1,137 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.analysis; + +import org.gistic.spatialImpala.catalog.*; + +import com.cloudera.impala.catalog.Table; +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.analysis.Analyzer; +import com.cloudera.impala.analysis.StatementBase; +import com.cloudera.impala.analysis.SelectStmt; +import com.cloudera.impala.analysis.SelectListItem; +import com.cloudera.impala.analysis.SelectList; +import com.cloudera.impala.analysis.TableName; +import com.cloudera.impala.analysis.TableRef; +import com.cloudera.impala.analysis.SlotRef; +import com.cloudera.impala.analysis.IsNullPredicate; +import com.cloudera.impala.analysis.Expr; +import com.cloudera.impala.analysis.CompoundPredicate; +import com.cloudera.impala.analysis.BinaryPredicate; +import com.cloudera.impala.analysis.IsNullPredicate; +import com.cloudera.impala.analysis.StringLiteral; +import com.cloudera.impala.analysis.NumericLiteral; +import com.cloudera.impala.analysis.LimitElement; +import com.cloudera.impala.analysis.OrderByElement; + +import java.util.List; +import java.util.ArrayList; +import java.math.BigDecimal; + +/** + * Represents a Spatial Point Inclusion statement + */ +public class SpatialKnnStmt extends StatementBase { + private static final String TABLE_NOT_SPATIAL_ERROR_MSG = "Table is not a spatial table."; + private static final String TAG = "tag"; + private static final String X = "x"; + private static final String Y = "y"; + private TableName tableName_; + private final Point p_; + private final LimitElement k_; + private final ArrayList dist_; + + // Initialized during analysis. + private SelectStmt selectStmt_; + + public SpatialKnnStmt(TableName tblName, Point p, LimitElement k, ArrayList dist) { + this.tableName_ = tblName; + this.p_ = p; + this.k_ = k; + this.dist_=dist; + this.selectStmt_ = null; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + // Getting table and checking for existence. + Table table; + if (!tableName_.isFullyQualified()) { + tableName_ = new TableName(analyzer.getDefaultDb(), + tableName_.getTbl()); + } + table = analyzer.getTable(tableName_, Privilege.SELECT); + + // Table should be an instance of a Spatial table. + if (!(table instanceof SpatialHdfsTable)) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG); + + SpatialHdfsTable spatialTable = (SpatialHdfsTable) table; + + // Global index shouldn't be null. + GlobalIndex globalIndex = spatialTable.getGlobalIndexIfAny(); + if (globalIndex == null) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG + + " : Table doesn't have global indexes."); + + List columnNames = spatialTable.getColumnNames(); + if (!(columnNames.contains(TAG) && columnNames.contains(X) && columnNames + .contains(Y))) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG + + " : Table doesn't have the required columns."); + + List globalIndexes = globalIndex + .getGIsforKnn(p_); + + // Preparing data for SelectStmt. + List tableRefs = new ArrayList(); + tableRefs.add(new TableRef(tableName_, null)); + + List items = new ArrayList(); + items.add(new SelectListItem(new SlotRef(tableName_, X), null)); + items.add(new SelectListItem(new SlotRef(tableName_, Y), null)); + + selectStmt_ = new SelectStmt(new SelectList(items), tableRefs, + createWherePredicate(globalIndexes), null, null, dist_, k_); + + selectStmt_.analyze(analyzer); + } + + @Override + public String toSql() { + return "load points from table " + tableName_.getTbl() + + " knn " + p_.toString() + "with_k " + k_ + ";"; + } + + public SelectStmt getSelectStmtIfAny() { + return selectStmt_; + } + + private Expr createWherePredicate(List globalIndexes) { + SlotRef globalIndexSlotRef = new SlotRef(tableName_, TAG); + if (globalIndexes == null || globalIndexes.size() == 0) { + return new IsNullPredicate(globalIndexSlotRef, false); + } + + // Creating Where predicate. + Expr wherePredicate = null; + + // Create Global Index predicate. + wherePredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, + globalIndexSlotRef, new StringLiteral(globalIndexes.get(0) + .getTag())); + + for (int i = 1; i < globalIndexes.size(); i++) { + Expr globalIndexPredicate = new BinaryPredicate( + BinaryPredicate.Operator.EQ, globalIndexSlotRef, + new StringLiteral(globalIndexes.get(i).getTag())); + + wherePredicate = new CompoundPredicate( + CompoundPredicate.Operator.OR, wherePredicate, + globalIndexPredicate); + } + + return wherePredicate; + } +} diff --git a/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialPointInclusionStmt.java b/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialPointInclusionStmt.java new file mode 100644 index 0000000000..cfa6d9691a --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialPointInclusionStmt.java @@ -0,0 +1,155 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.analysis; + +import org.gistic.spatialImpala.catalog.*; + +import com.cloudera.impala.catalog.Table; +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.analysis.Analyzer; +import com.cloudera.impala.analysis.StatementBase; +import com.cloudera.impala.analysis.SelectStmt; +import com.cloudera.impala.analysis.SelectListItem; +import com.cloudera.impala.analysis.SelectList; +import com.cloudera.impala.analysis.TableName; +import com.cloudera.impala.analysis.TableRef; +import com.cloudera.impala.analysis.SlotRef; +import com.cloudera.impala.analysis.IsNullPredicate; +import com.cloudera.impala.analysis.Expr; +import com.cloudera.impala.analysis.CompoundPredicate; +import com.cloudera.impala.analysis.BinaryPredicate; +import com.cloudera.impala.analysis.IsNullPredicate; +import com.cloudera.impala.analysis.StringLiteral; +import com.cloudera.impala.analysis.NumericLiteral; + +import java.util.List; +import java.util.ArrayList; +import java.math.BigDecimal; + +/** + * Represents a Spatial Point Inclusion statement + */ +public class SpatialPointInclusionStmt extends StatementBase { + private static final String TABLE_NOT_SPATIAL_ERROR_MSG = "Table is not a spatial table."; + private static final String TAG = "tag"; + private static final String X = "x"; + private static final String Y = "y"; + private TableName tableName_; + private final Rectangle rect_; + + // Initialized during analysis. + private SelectStmt selectStmt_; + + public SpatialPointInclusionStmt(TableName tblName, Rectangle rect) { + this.tableName_ = tblName; + this.rect_ = rect; + this.selectStmt_ = null; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + // Getting table and checking for existence. + Table table; + if (!tableName_.isFullyQualified()) { + tableName_ = new TableName(analyzer.getDefaultDb(), + tableName_.getTbl()); + } + table = analyzer.getTable(tableName_, Privilege.SELECT); + + // Table should be an instance of a Spatial table. + if (!(table instanceof SpatialHdfsTable)) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG); + + SpatialHdfsTable spatialTable = (SpatialHdfsTable) table; + + // Global index shouldn't be null. + GlobalIndex globalIndex = spatialTable.getGlobalIndexIfAny(); + if (globalIndex == null) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG + + " : Table doesn't have global indexes."); + + List columnNames = spatialTable.getColumnNames(); + if (!(columnNames.contains(TAG) && columnNames.contains(X) && columnNames + .contains(Y))) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG + + " : Table doesn't have the required columns."); + + List globalIndexes = globalIndex + .getGIsforRectangle(rect_); + + // Preparing data for SelectStmt. + List tableRefs = new ArrayList(); + tableRefs.add(new TableRef(tableName_, null)); + + List items = new ArrayList(); + items.add(new SelectListItem(new SlotRef(tableName_, X), null)); + items.add(new SelectListItem(new SlotRef(tableName_, Y), null)); + + selectStmt_ = new SelectStmt(new SelectList(items), tableRefs, + createWherePredicate(globalIndexes), null, null, null, null); + + selectStmt_.analyze(analyzer); + } + + @Override + public String toSql() { + return "load points from table " + tableName_.getTbl() + + " overlaps rectangle" + rect_.toString() + ";"; + } + + public SelectStmt getSelectStmtIfAny() { + return selectStmt_; + } + + private Expr createWherePredicate(List globalIndexes) { + SlotRef globalIndexSlotRef = new SlotRef(tableName_, TAG); + if (globalIndexes == null || globalIndexes.size() == 0) { + return new IsNullPredicate(globalIndexSlotRef, false); + } + + // Creating Where predicate. + Expr wherePredicate = null; + + // Create Global Index predicate. + wherePredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, + globalIndexSlotRef, new StringLiteral(globalIndexes.get(0) + .getTag())); + + for (int i = 1; i < globalIndexes.size(); i++) { + Expr globalIndexPredicate = new BinaryPredicate( + BinaryPredicate.Operator.EQ, globalIndexSlotRef, + new StringLiteral(globalIndexes.get(i).getTag())); + + wherePredicate = new CompoundPredicate( + CompoundPredicate.Operator.OR, wherePredicate, + globalIndexPredicate); + } + + // Create Rectangle predicate. + SlotRef xSlotRef = new SlotRef(tableName_, X); + SlotRef ySlotRef = new SlotRef(tableName_, Y); + + wherePredicate = new CompoundPredicate(CompoundPredicate.Operator.AND, + wherePredicate, new BinaryPredicate( + BinaryPredicate.Operator.GE, xSlotRef, + new NumericLiteral(new BigDecimal(rect_.getX1())))); + + wherePredicate = new CompoundPredicate(CompoundPredicate.Operator.AND, + wherePredicate, new BinaryPredicate( + BinaryPredicate.Operator.LE, xSlotRef, + new NumericLiteral(new BigDecimal(rect_.getX2())))); + + wherePredicate = new CompoundPredicate(CompoundPredicate.Operator.AND, + wherePredicate, new BinaryPredicate( + BinaryPredicate.Operator.GE, ySlotRef, + new NumericLiteral(new BigDecimal(rect_.getY1())))); + + wherePredicate = new CompoundPredicate(CompoundPredicate.Operator.AND, + wherePredicate, new BinaryPredicate( + BinaryPredicate.Operator.LE, ySlotRef, + new NumericLiteral(new BigDecimal(rect_.getY2())))); + + return wherePredicate; + } +} \ No newline at end of file diff --git a/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndex.java b/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndex.java new file mode 100644 index 0000000000..934ed83a93 --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndex.java @@ -0,0 +1,178 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.catalog; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +import com.cloudera.impala.catalog.Catalog; +import com.cloudera.impala.catalog.CatalogObject; +import com.cloudera.impala.common.FileSystemUtil; +import com.cloudera.impala.thrift.TCatalogObjectType; +import com.cloudera.impala.thrift.TGlobalIndex; +import com.cloudera.impala.thrift.TGlobalIndexRecord; + +import java.io.IOException; +import java.util.Scanner; +import java.util.ArrayList; +import java.util.List; +import java.util.HashMap; +import java.util.Map.Entry; + +/* + * Global Index's catalog class responsible for holding the global indexes' + * records of a specific Spatial Table. + */ +public class GlobalIndex implements CatalogObject { + private static final Logger LOG = Logger.getLogger(GlobalIndex.class); + + private static String GLOBAL_INDEX_READ_EXCEPTION_MSG = "Couldn't parse Global Index file: "; + private static String GLOBAL_INDEX_SUFFIX = "_global_index"; + public static String GLOBAL_INDEX_TABLE_PARAM = "globalIndex"; + private HashMap globalIndexMap; + private final String tableName_; + private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; + + private GlobalIndex(String tableName, + HashMap globalIndexMap) { + this.tableName_ = tableName; + this.globalIndexMap = globalIndexMap; + } + + public List getGIsforPoint(int x, int y) { + List globalIndexes = new ArrayList(); + for (GlobalIndexRecord gIRecord : globalIndexMap.values()) { + if (gIRecord.getMBR().includesPoint(x, y)) + globalIndexes.add(gIRecord); + } + return globalIndexes; + } + + public List getGIsforKnn(Point pt) { + double maxdist = 0; + for (GlobalIndexRecord gIRecord : globalIndexMap.values()) { + if (gIRecord.getMBR().includesPoint(pt.getX(), pt.getY())) + maxdist = Math.max(maxdist, gIRecord.getMBR().getMaxDist(pt.getX(), pt.getY())); + } + List globalIndexes = new ArrayList(); + for (GlobalIndexRecord gIRecord : globalIndexMap.values()) { + if (gIRecord.getMBR().getMinDist(pt.getX(), pt.getY()) < maxdist ) + globalIndexes.add(gIRecord); + } + return globalIndexes; + } + + public List getGIsforRectangle(Rectangle rect) { + List globalIndexes = new ArrayList(); + for (GlobalIndexRecord gIRecord : globalIndexMap.values()) { + if (gIRecord.getMBR().overlaps(rect)) { + LOG.info("GI record: " + gIRecord.getMBR() + + " overlaps with: " + rect); + globalIndexes.add(gIRecord); + } + else { + LOG.info("GI record: " + gIRecord.getMBR() + + " does not overlap with: " + rect); + } + } + return globalIndexes; + } + + public GlobalIndexRecord getGIRecordforTag(String tag) { + return globalIndexMap.get(tag); + } + + public TCatalogObjectType getCatalogObjectType() { + return TCatalogObjectType.GLOBAL_INDEX; + } + + public String getName() { + return tableName_ + GLOBAL_INDEX_SUFFIX; + } + + public long getCatalogVersion() { + return catalogVersion_; + } + + public void setCatalogVersion(long newVersion) { + catalogVersion_ = newVersion; + } + + public boolean isLoaded() { + return true; + } + + public TGlobalIndex toThrift() { + HashMap tGlobalIndexMap = new HashMap(); + for (Entry gIRecord : globalIndexMap.entrySet()) { + tGlobalIndexMap.put(gIRecord.getKey(), gIRecord.getValue().toThrift()); + } + return new TGlobalIndex(tableName_, tGlobalIndexMap); + } + + private static HashMap loadGlobalIndex(String globalIndexPath) { + LOG.info("Loading global index file."); + Path gIPath = new Path(globalIndexPath.trim()); + gIPath = FileSystemUtil.createFullyQualifiedPath(gIPath); + String data = null; + try { + data = FileSystemUtil.readFile(gIPath); + } catch (IOException e) { + LOG.error(GLOBAL_INDEX_READ_EXCEPTION_MSG + globalIndexPath); + return null; + } + + if (data == null || data.length() == 0) { + LOG.error(GLOBAL_INDEX_READ_EXCEPTION_MSG + globalIndexPath); + return null; + } + + HashMap gIMap = new HashMap(); + Scanner scanner = new Scanner(data); + while (scanner.hasNext()) { + String[] separatedRecord = scanner.next().split(","); + if (separatedRecord.length != 6) { + LOG.error(GLOBAL_INDEX_READ_EXCEPTION_MSG + globalIndexPath); + scanner.close(); + return null; + } + + int id = 0; + double x1 = 0; + double y1 = 0; + double x2 = 0; + double y2 = 0; + + try { + id = Integer.parseInt(separatedRecord[0]); + x1 = Double.parseDouble(separatedRecord[1]); + y1 = Double.parseDouble(separatedRecord[2]); + x2 = Double.parseDouble(separatedRecord[3]); + y2 = Double.parseDouble(separatedRecord[4]); + } catch (Exception e) { + LOG.error(GLOBAL_INDEX_READ_EXCEPTION_MSG + globalIndexPath); + scanner.close(); + return null; + } + LOG.info("Reading Record: [" + id + ", " + separatedRecord[5] + ", " + x1 + ", " + y1 + ", " + x2 + ", " + y2 + "]"); + GlobalIndexRecord gIRecord = new GlobalIndexRecord(id, separatedRecord[5], new Rectangle(x1, y1, x2, y2)); + gIMap.put(separatedRecord[5], gIRecord); + } + scanner.close(); + return gIMap; + } + + public static GlobalIndex loadAndCreateGlobalIndex(String tableName, String globalIndexPath) { + HashMap gIMap = loadGlobalIndex(globalIndexPath); + return gIMap != null ? new GlobalIndex(tableName, gIMap) : null; + } + + public static GlobalIndex fromThrift(TGlobalIndex tGlobalIndex) { + HashMap gIMap = new HashMap(); + for (Entry gIRecord : tGlobalIndex.getGlobalIndexMap().entrySet()) { + gIMap.put(gIRecord.getKey(), GlobalIndexRecord.fromThrift(gIRecord.getValue())); + } + return new GlobalIndex(tGlobalIndex.getTbl_name(), gIMap); + } +} diff --git a/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndexRecord.java b/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndexRecord.java new file mode 100644 index 0000000000..aa8e19a4c9 --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndexRecord.java @@ -0,0 +1,43 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.catalog; + +import com.cloudera.impala.thrift.TGlobalIndexRecord; +import com.cloudera.impala.thrift.TRectangle; + +/* + * Global Index Record class responsible for holding a single + * global index partition. + */ +public class GlobalIndexRecord { + private int id; + private String tag; + private Rectangle mbr; + + public GlobalIndexRecord(int id, String tag, Rectangle mbr) { + this.id = id; + this.tag = tag; + this.mbr = mbr; + } + + public int getId() { + return id; + } + + public String getTag() { + return tag; + } + + public Rectangle getMBR() { + return mbr; + } + + public TGlobalIndexRecord toThrift() { + return new TGlobalIndexRecord(this.id, this.tag, this.mbr.toThrift()); + } + + public static GlobalIndexRecord fromThrift(TGlobalIndexRecord gIRecord) { + TRectangle rect = gIRecord.getMbr(); + return new GlobalIndexRecord(gIRecord.getId(), gIRecord.getTag(), Rectangle.fromThrift(rect)); + } +} \ No newline at end of file diff --git a/fe/src/main/java/org/gistic/spatialImpala/catalog/Point.java b/fe/src/main/java/org/gistic/spatialImpala/catalog/Point.java new file mode 100644 index 0000000000..48716b0944 --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/catalog/Point.java @@ -0,0 +1,36 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.catalog; + +import com.cloudera.impala.thrift.TPoint; + +public class Point { + private double x; + private double y; + + public Point(double x, double y) { + this.x = x; + this.y = y; + } + + public TPoint toThrift() { + return new TPoint(this.x, this.y); + } + + public static Point fromThrift(TPoint p) { + return new Point(p.getX(), p.getY()); + } + + public double getX() { + return x; + } + + public double getY() { + return y; + } + + @Override + public String toString() { + return "(" + x + ", " + y + ")"; + } +} diff --git a/fe/src/main/java/org/gistic/spatialImpala/catalog/Rectangle.java b/fe/src/main/java/org/gistic/spatialImpala/catalog/Rectangle.java new file mode 100644 index 0000000000..73114f7225 --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/catalog/Rectangle.java @@ -0,0 +1,75 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.catalog; + +import com.cloudera.impala.thrift.TRectangle; + +public class Rectangle { + private double x1; + private double y1; + private double x2; + private double y2; + + public Rectangle(double x1, double y1, double x2, double y2) { + this.x1 = x1; + this.y1 = y1; + this.x2 = x2; + this.y2 = y2; + } + + public boolean includesPoint(double x, double y) { + return (x >= x1) && (x <= x2) && (y >= y1) && (y <= y2); + } + + public double getMaxDist(double x, double y) { + return Math.max(Math.max(dist(x,y,x1,y1),dist(x,y,x2,y1)),Math.max(dist(x,y,x1,y2),dist(x,y,x2,y2))); + } + + public double getMinDist(double x, double y) { + return Math.min(Math.min(dist(x,y,x1,y1),dist(x,y,x2,y1)),Math.min(dist(x,y,x1,y2),dist(x,y,x2,y2))); + } + + private double dist(double x1, double y1, double X2, double y2) { + return Math.sqrt( (x1 - x2) * (x1 - x2) + (y1 - y2) * (y1 - y2) ); + } + + public boolean overlaps(Rectangle rect) { + if (this.x1 > rect.x2 || this.x2 < rect.x1) + return false; + + if (this.y1 > rect.y2 || this.y2 < rect.y1) + return false; + + return true; + } + + public TRectangle toThrift() { + return new TRectangle(this.x1, this.y1, this.x2, this.y2); + } + + public static Rectangle fromThrift(TRectangle rect) { + return new Rectangle(rect.getX1(), rect.getY1(), rect.getX2(), rect.getY2()); + } + + public double getX1() { + return x1; + } + + public double getY1() { + return y1; + } + + public double getX2() { + return x2; + } + + public double getY2() { + return y2; + } + + @Override + public String toString() { + return "(" + x1 + ", " + y1 + ", " + + x2 + ", " + y2 + ")"; + } +} diff --git a/fe/src/main/java/org/gistic/spatialImpala/catalog/SpatialHdfsTable.java b/fe/src/main/java/org/gistic/spatialImpala/catalog/SpatialHdfsTable.java new file mode 100644 index 0000000000..678617ded1 --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/catalog/SpatialHdfsTable.java @@ -0,0 +1,73 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.catalog; + +import com.cloudera.impala.catalog.Db; +import com.cloudera.impala.catalog.HdfsTable; +import com.cloudera.impala.catalog.TableId; +import com.cloudera.impala.catalog.TableLoadingException; +import com.cloudera.impala.thrift.TTable; + +import java.util.Map; + +public class SpatialHdfsTable extends HdfsTable { + protected GlobalIndex global_index_; + + public SpatialHdfsTable(TableId id, + org.apache.hadoop.hive.metastore.api.Table msTbl, Db db, + String name, String owner) { + + super(id, msTbl, db, name, owner); + global_index_ = initializeGlobalIndex(this.getName(), msTbl); + } + + public GlobalIndex getGlobalIndexIfAny() { + return global_index_; + } + + private GlobalIndex initializeGlobalIndex(String tableName, + org.apache.hadoop.hive.metastore.api.Table msTbl) { + + if (msTbl == null) + return null; + + Map params = msTbl.getParameters(); + if (params == null) + return null; + + String globalIndexPath = params + .get(GlobalIndex.GLOBAL_INDEX_TABLE_PARAM); + if (globalIndexPath == null) + return null; + + return GlobalIndex.loadAndCreateGlobalIndex(tableName, globalIndexPath); + } + + @Override + public TTable toThrift() { + // Send all metadata between the catalog service and the FE. + TTable table = super.toThrift(); + if (global_index_ != null) + table.setGlobalIndex(global_index_.toThrift()); + return table; + } + + @Override + protected void loadFromThrift(TTable thriftTable) + throws TableLoadingException { + + super.loadFromThrift(thriftTable); + if (thriftTable.getGlobalIndex() != null) + global_index_ = GlobalIndex + .fromThrift(thriftTable.getGlobalIndex()); + } + + public static boolean isSpatial( + org.apache.hadoop.hive.metastore.api.Table msTbl) { + + return msTbl != null + && msTbl.getParameters() != null + && msTbl.getParameters().get( + GlobalIndex.GLOBAL_INDEX_TABLE_PARAM) != null; + } +} \ No newline at end of file diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index 07d327279a..904bba623a 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -131,6 +131,7 @@ import com.cloudera.impala.analysis.SqlParserSymbols; keywordMap.put("invalidate", new Integer(SqlParserSymbols.KW_INVALIDATE)); keywordMap.put("is", new Integer(SqlParserSymbols.KW_IS)); keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN)); + keywordMap.put("knn", new Integer(SqlParserSymbols.KW_KNN)); keywordMap.put("last", new Integer(SqlParserSymbols.KW_LAST)); keywordMap.put("left", new Integer(SqlParserSymbols.KW_LEFT)); keywordMap.put("like", new Integer(SqlParserSymbols.KW_LIKE)); @@ -151,18 +152,22 @@ import com.cloudera.impala.analysis.SqlParserSymbols; keywordMap.put("order", new Integer(SqlParserSymbols.KW_ORDER)); keywordMap.put("outer", new Integer(SqlParserSymbols.KW_OUTER)); keywordMap.put("over", new Integer(SqlParserSymbols.KW_OVER)); + keywordMap.put("overlaps", new Integer(SqlParserSymbols.KW_OVERLAPS)); keywordMap.put("overwrite", new Integer(SqlParserSymbols.KW_OVERWRITE)); keywordMap.put("parquet", new Integer(SqlParserSymbols.KW_PARQUET)); keywordMap.put("parquetfile", new Integer(SqlParserSymbols.KW_PARQUETFILE)); keywordMap.put("partition", new Integer(SqlParserSymbols.KW_PARTITION)); keywordMap.put("partitioned", new Integer(SqlParserSymbols.KW_PARTITIONED)); keywordMap.put("partitions", new Integer(SqlParserSymbols.KW_PARTITIONS)); + keywordMap.put("point", new Integer(SqlParserSymbols.KW_POINT)); + keywordMap.put("points", new Integer(SqlParserSymbols.KW_POINTS)); keywordMap.put("preceding", new Integer(SqlParserSymbols.KW_PRECEDING)); keywordMap.put("prepare_fn", new Integer(SqlParserSymbols.KW_PREPARE_FN)); keywordMap.put("produced", new Integer(SqlParserSymbols.KW_PRODUCED)); keywordMap.put("range", new Integer(SqlParserSymbols.KW_RANGE)); keywordMap.put("rcfile", new Integer(SqlParserSymbols.KW_RCFILE)); keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE)); + keywordMap.put("rectangle", new Integer(SqlParserSymbols.KW_RECTANGLE)); keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH)); keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP)); keywordMap.put("rename", new Integer(SqlParserSymbols.KW_RENAME)); @@ -213,6 +218,8 @@ import com.cloudera.impala.analysis.SqlParserSymbols; keywordMap.put("when", new Integer(SqlParserSymbols.KW_WHEN)); keywordMap.put("where", new Integer(SqlParserSymbols.KW_WHERE)); keywordMap.put("with", new Integer(SqlParserSymbols.KW_WITH)); + keywordMap.put("with_dist", new Integer(SqlParserSymbols.KW_WITHDIST)); + keywordMap.put("with_k", new Integer(SqlParserSymbols.KW_WITHK)); } // map from token id to token description diff --git a/fe/src/test/resources/hive-log4j.properties b/fe/src/test/resources/hive-log4j.properties new file mode 100644 index 0000000000..ed532f8eee --- /dev/null +++ b/fe/src/test/resources/hive-log4j.properties @@ -0,0 +1,62 @@ +# Define some default values that can be overridden by system properties +hive.root.logger=INFO,DRFA +hive.log.dir=/home/ubuntu/SpatialImpala/cluster_logs/hive +hive.log.file=hive.log + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hive.root.logger}, EventCounter + +# Logging Threshold +log4j.threshhold=WARN + +# +# Daily Rolling File Appender +# + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +#custom logging levels +#log4j.logger.xxx=DEBUG + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter + + +log4j.category.DataNucleus=ERROR,DRFA +log4j.category.Datastore=ERROR,DRFA +log4j.category.Datastore.Schema=ERROR,DRFA +log4j.category.JPOX.Datastore=ERROR,DRFA +log4j.category.JPOX.Plugin=ERROR,DRFA +log4j.category.JPOX.MetaData=ERROR,DRFA +log4j.category.JPOX.Query=ERROR,DRFA +log4j.category.JPOX.General=ERROR,DRFA +log4j.category.JPOX.Enhancer=ERROR,DRFA +log4j.logger.org.apache.hadoop.conf.Configuration=ERROR,DRFA + diff --git a/fe/src/test/resources/sentry-site.xml b/fe/src/test/resources/sentry-site.xml new file mode 100644 index 0000000000..d9c76cde1d --- /dev/null +++ b/fe/src/test/resources/sentry-site.xml @@ -0,0 +1,61 @@ + + + + + + + sentry.service.security.mode + none + + + sentry.service.admin.group + root + + + sentry.service.server.rpc-address + localhost + + + sentry.service.server.rpc-port + 30911 + + + sentry.service.client.server.rpc-address + localhost + + + sentry.service.client.server.rpc-port + 30911 + + + sentry.store.jdbc.url + jdbc:postgresql://localhost:5432/sentry_policy/;create=true + + + sentry.store.jdbc.user + hiveuser + + + sentry.store.jdbc.password + password + + + sentry.verify.schema.version + false + +