diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 6331dc816f..e616e7c3bb 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -266,6 +266,12 @@ struct TDataSourceTable { 2: required string init_string } +// Represents a point +struct TPoint { + 1: required double x + 2: required double y +} + // Represents a rectangle used in a global index record for spatial tables. struct TRectangle { 1: required double x1 diff --git a/fe/src/main/cup/sql-parser.y b/fe/src/main/cup/sql-parser.y index b6b2c04a2d..1bd79527f9 100644 --- a/fe/src/main/cup/sql-parser.y +++ b/fe/src/main/cup/sql-parser.y @@ -14,8 +14,10 @@ package com.cloudera.impala.analysis; +import org.gistic.spatialImpala.catalog.Point; import org.gistic.spatialImpala.catalog.Rectangle; import org.gistic.spatialImpala.analysis.SpatialPointInclusionStmt; +import org.gistic.spatialImpala.analysis.SpatialKnnStmt; import com.cloudera.impala.catalog.Type; import com.cloudera.impala.catalog.ScalarType; @@ -242,10 +244,10 @@ terminal KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HAVING, KW_IF, KW_IN, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, - KW_INVALIDATE, KW_IS, KW_JOIN, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, + KW_INVALIDATE, KW_IS, KW_JOIN, KW_KNN, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, KW_OVERLAPS, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, - KW_PARTITION, KW_PARTITIONED, KW_PARTITIONS, KW_POINTS, KW_PRECEDING, + KW_PARTITION, KW_PARTITIONED, KW_PARTITIONS, KW_POINT, KW_POINTS, KW_PRECEDING, KW_PREPARE_FN, KW_PRODUCED, KW_RANGE, KW_RCFILE, KW_RECTANGLE, KW_REFRESH, KW_REGEXP, KW_RENAME, KW_REPLACE, KW_RETURNS, KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, KW_SERDEPROPERTIES, @@ -254,7 +256,7 @@ terminal KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UPDATE_FN, KW_USE, KW_USING, - KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH; + KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH, KW_WITHDIST, KW_WITHK; terminal COLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT; @@ -445,6 +447,9 @@ nonterminal CreateFunctionStmtBase.OptArg create_function_arg_key; nonterminal SpatialPointInclusionStmt spatial_point_inclusion_stmt; nonterminal Rectangle rectangle; nonterminal NumericLiteral rectangle_arg; +nonterminal SpatialKnnStmt spatial_knn_stmt; +nonterminal Point point; +nonterminal NumericLiteral point_arg; precedence left KW_OR; precedence left KW_AND; @@ -559,6 +564,8 @@ stmt ::= {: RESULT = revoke_privilege; :} | spatial_point_inclusion_stmt:spatial_point_inclusion {: RESULT = spatial_point_inclusion; :} + | spatial_knn_stmt:spatial_knn + {: RESULT = spatial_knn; :} ; spatial_point_inclusion_stmt ::= @@ -566,6 +573,11 @@ spatial_point_inclusion_stmt ::= {: RESULT = new SpatialPointInclusionStmt(tbl, rect); :} ; +spatial_knn_stmt ::= + KW_LOAD KW_POINTS KW_FROM KW_TABLE table_name:tbl KW_KNN point:p KW_WITHK expr:k KW_WITHDIST order_by_elements:dist + {: RESULT = new SpatialKnnStmt(tbl,p,new LimitElement(k, null), dist); :} + ; + rectangle ::= KW_RECTANGLE LPAREN rectangle_arg:x1 COMMA rectangle_arg:y1 COMMA rectangle_arg:x2 COMMA rectangle_arg:y2 RPAREN @@ -573,6 +585,11 @@ rectangle ::= x2.getDoubleValue(), y2.getDoubleValue()); :} ; +point ::= + KW_POINT LPAREN point_arg:x COMMA point_arg:y RPAREN + {: RESULT = new Point(x.getDoubleValue(), y.getDoubleValue()); :} + ; + rectangle_arg ::= INTEGER_LITERAL:l {: RESULT = new NumericLiteral(l); :} @@ -580,6 +597,13 @@ rectangle_arg ::= {: RESULT = new NumericLiteral(l); :} ; +point_arg ::= + INTEGER_LITERAL:l + {: RESULT = new NumericLiteral(l); :} + | DECIMAL_LITERAL:l + {: RESULT = new NumericLiteral(l); :} + ; + load_stmt ::= KW_LOAD KW_DATA KW_INPATH STRING_LITERAL:path overwrite_val:overwrite KW_INTO KW_TABLE table_name:table opt_partition_spec:partition diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java index 9839242c9f..a4103ab6f8 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java @@ -58,7 +58,8 @@ static public class AnalysisResult { public boolean isComputeStatsStmt() { return stmt_ instanceof ComputeStatsStmt; } public boolean isQueryStmt() { return (stmt_ instanceof QueryStmt - || stmt_ instanceof SpatialPointInclusionStmt); + || stmt_ instanceof SpatialPointInclusionStmt + || stmt_ instanceof SpatialKnnStmt); } public boolean isInsertStmt() { return stmt_ instanceof InsertStmt; } public boolean isDropDbStmt() { return stmt_ instanceof DropDbStmt; } @@ -212,6 +213,8 @@ public QueryStmt getQueryStmt() { return (QueryStmt) stmt_; else if (stmt_ instanceof SpatialPointInclusionStmt) return ((SpatialPointInclusionStmt) stmt_).getSelectStmtIfAny(); + else if (stmt_ instanceof SpatialKnnStmt) + return ((SpatialKnnStmt) stmt_).getSelectStmtIfAny(); else return null; } diff --git a/fe/src/main/java/com/cloudera/impala/analysis/LimitElement.java b/fe/src/main/java/com/cloudera/impala/analysis/LimitElement.java index f2f1ef3c57..f7e211b867 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/LimitElement.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/LimitElement.java @@ -23,7 +23,7 @@ /** * Combination of limit and offset expressions. */ -class LimitElement { +public class LimitElement { private final Expr limitExpr_; private final Expr offsetExpr_; private long limit_; diff --git a/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialKnnStmt.java b/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialKnnStmt.java new file mode 100644 index 0000000000..781bc19408 --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialKnnStmt.java @@ -0,0 +1,137 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.analysis; + +import org.gistic.spatialImpala.catalog.*; + +import com.cloudera.impala.catalog.Table; +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.analysis.Analyzer; +import com.cloudera.impala.analysis.StatementBase; +import com.cloudera.impala.analysis.SelectStmt; +import com.cloudera.impala.analysis.SelectListItem; +import com.cloudera.impala.analysis.SelectList; +import com.cloudera.impala.analysis.TableName; +import com.cloudera.impala.analysis.TableRef; +import com.cloudera.impala.analysis.SlotRef; +import com.cloudera.impala.analysis.IsNullPredicate; +import com.cloudera.impala.analysis.Expr; +import com.cloudera.impala.analysis.CompoundPredicate; +import com.cloudera.impala.analysis.BinaryPredicate; +import com.cloudera.impala.analysis.IsNullPredicate; +import com.cloudera.impala.analysis.StringLiteral; +import com.cloudera.impala.analysis.NumericLiteral; +import com.cloudera.impala.analysis.LimitElement; +import com.cloudera.impala.analysis.OrderByElement; + +import java.util.List; +import java.util.ArrayList; +import java.math.BigDecimal; + +/** + * Represents a Spatial Point Inclusion statement + */ +public class SpatialKnnStmt extends StatementBase { + private static final String TABLE_NOT_SPATIAL_ERROR_MSG = "Table is not a spatial table."; + private static final String TAG = "tag"; + private static final String X = "x"; + private static final String Y = "y"; + private TableName tableName_; + private final Point p_; + private final LimitElement k_; + private final ArrayList dist_; + + // Initialized during analysis. + private SelectStmt selectStmt_; + + public SpatialKnnStmt(TableName tblName, Point p, LimitElement k, ArrayList dist) { + this.tableName_ = tblName; + this.p_ = p; + this.k_ = k; + this.dist_=dist; + this.selectStmt_ = null; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + // Getting table and checking for existence. + Table table; + if (!tableName_.isFullyQualified()) { + tableName_ = new TableName(analyzer.getDefaultDb(), + tableName_.getTbl()); + } + table = analyzer.getTable(tableName_, Privilege.SELECT); + + // Table should be an instance of a Spatial table. + if (!(table instanceof SpatialHdfsTable)) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG); + + SpatialHdfsTable spatialTable = (SpatialHdfsTable) table; + + // Global index shouldn't be null. + GlobalIndex globalIndex = spatialTable.getGlobalIndexIfAny(); + if (globalIndex == null) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG + + " : Table doesn't have global indexes."); + + List columnNames = spatialTable.getColumnNames(); + if (!(columnNames.contains(TAG) && columnNames.contains(X) && columnNames + .contains(Y))) + throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG + + " : Table doesn't have the required columns."); + + List globalIndexes = globalIndex + .getGIsforKnn(p_); + + // Preparing data for SelectStmt. + List tableRefs = new ArrayList(); + tableRefs.add(new TableRef(tableName_, null)); + + List items = new ArrayList(); + items.add(new SelectListItem(new SlotRef(tableName_, X), null)); + items.add(new SelectListItem(new SlotRef(tableName_, Y), null)); + + selectStmt_ = new SelectStmt(new SelectList(items), tableRefs, + createWherePredicate(globalIndexes), null, null, dist_, k_); + + selectStmt_.analyze(analyzer); + } + + @Override + public String toSql() { + return "load points from table " + tableName_.getTbl() + + " knn " + p_.toString() + "with_k " + k_ + ";"; + } + + public SelectStmt getSelectStmtIfAny() { + return selectStmt_; + } + + private Expr createWherePredicate(List globalIndexes) { + SlotRef globalIndexSlotRef = new SlotRef(tableName_, TAG); + if (globalIndexes == null || globalIndexes.size() == 0) { + return new IsNullPredicate(globalIndexSlotRef, false); + } + + // Creating Where predicate. + Expr wherePredicate = null; + + // Create Global Index predicate. + wherePredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, + globalIndexSlotRef, new StringLiteral(globalIndexes.get(0) + .getTag())); + + for (int i = 1; i < globalIndexes.size(); i++) { + Expr globalIndexPredicate = new BinaryPredicate( + BinaryPredicate.Operator.EQ, globalIndexSlotRef, + new StringLiteral(globalIndexes.get(i).getTag())); + + wherePredicate = new CompoundPredicate( + CompoundPredicate.Operator.OR, wherePredicate, + globalIndexPredicate); + } + + return wherePredicate; + } +} diff --git a/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndex.java b/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndex.java index e42c817cd6..934ed83a93 100644 --- a/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndex.java +++ b/fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndex.java @@ -48,7 +48,21 @@ public List getGIsforPoint(int x, int y) { } return globalIndexes; } - + + public List getGIsforKnn(Point pt) { + double maxdist = 0; + for (GlobalIndexRecord gIRecord : globalIndexMap.values()) { + if (gIRecord.getMBR().includesPoint(pt.getX(), pt.getY())) + maxdist = Math.max(maxdist, gIRecord.getMBR().getMaxDist(pt.getX(), pt.getY())); + } + List globalIndexes = new ArrayList(); + for (GlobalIndexRecord gIRecord : globalIndexMap.values()) { + if (gIRecord.getMBR().getMinDist(pt.getX(), pt.getY()) < maxdist ) + globalIndexes.add(gIRecord); + } + return globalIndexes; + } + public List getGIsforRectangle(Rectangle rect) { List globalIndexes = new ArrayList(); for (GlobalIndexRecord gIRecord : globalIndexMap.values()) { @@ -161,4 +175,4 @@ public static GlobalIndex fromThrift(TGlobalIndex tGlobalIndex) { } return new GlobalIndex(tGlobalIndex.getTbl_name(), gIMap); } -} \ No newline at end of file +} diff --git a/fe/src/main/java/org/gistic/spatialImpala/catalog/Point.java b/fe/src/main/java/org/gistic/spatialImpala/catalog/Point.java new file mode 100644 index 0000000000..48716b0944 --- /dev/null +++ b/fe/src/main/java/org/gistic/spatialImpala/catalog/Point.java @@ -0,0 +1,36 @@ +// Copyright 2014 GISTIC. + +package org.gistic.spatialImpala.catalog; + +import com.cloudera.impala.thrift.TPoint; + +public class Point { + private double x; + private double y; + + public Point(double x, double y) { + this.x = x; + this.y = y; + } + + public TPoint toThrift() { + return new TPoint(this.x, this.y); + } + + public static Point fromThrift(TPoint p) { + return new Point(p.getX(), p.getY()); + } + + public double getX() { + return x; + } + + public double getY() { + return y; + } + + @Override + public String toString() { + return "(" + x + ", " + y + ")"; + } +} diff --git a/fe/src/main/java/org/gistic/spatialImpala/catalog/Rectangle.java b/fe/src/main/java/org/gistic/spatialImpala/catalog/Rectangle.java index 81d854ec47..73114f7225 100644 --- a/fe/src/main/java/org/gistic/spatialImpala/catalog/Rectangle.java +++ b/fe/src/main/java/org/gistic/spatialImpala/catalog/Rectangle.java @@ -17,9 +17,21 @@ public Rectangle(double x1, double y1, double x2, double y2) { this.y2 = y2; } - public boolean includesPoint(int x, int y) { + public boolean includesPoint(double x, double y) { return (x >= x1) && (x <= x2) && (y >= y1) && (y <= y2); } + + public double getMaxDist(double x, double y) { + return Math.max(Math.max(dist(x,y,x1,y1),dist(x,y,x2,y1)),Math.max(dist(x,y,x1,y2),dist(x,y,x2,y2))); + } + + public double getMinDist(double x, double y) { + return Math.min(Math.min(dist(x,y,x1,y1),dist(x,y,x2,y1)),Math.min(dist(x,y,x1,y2),dist(x,y,x2,y2))); + } + + private double dist(double x1, double y1, double X2, double y2) { + return Math.sqrt( (x1 - x2) * (x1 - x2) + (y1 - y2) * (y1 - y2) ); + } public boolean overlaps(Rectangle rect) { if (this.x1 > rect.x2 || this.x2 < rect.x1) @@ -60,4 +72,4 @@ public String toString() { return "(" + x1 + ", " + y1 + ", " + x2 + ", " + y2 + ")"; } -} \ No newline at end of file +} diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index 547c1ea5e4..904bba623a 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -131,6 +131,7 @@ import com.cloudera.impala.analysis.SqlParserSymbols; keywordMap.put("invalidate", new Integer(SqlParserSymbols.KW_INVALIDATE)); keywordMap.put("is", new Integer(SqlParserSymbols.KW_IS)); keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN)); + keywordMap.put("knn", new Integer(SqlParserSymbols.KW_KNN)); keywordMap.put("last", new Integer(SqlParserSymbols.KW_LAST)); keywordMap.put("left", new Integer(SqlParserSymbols.KW_LEFT)); keywordMap.put("like", new Integer(SqlParserSymbols.KW_LIKE)); @@ -158,6 +159,7 @@ import com.cloudera.impala.analysis.SqlParserSymbols; keywordMap.put("partition", new Integer(SqlParserSymbols.KW_PARTITION)); keywordMap.put("partitioned", new Integer(SqlParserSymbols.KW_PARTITIONED)); keywordMap.put("partitions", new Integer(SqlParserSymbols.KW_PARTITIONS)); + keywordMap.put("point", new Integer(SqlParserSymbols.KW_POINT)); keywordMap.put("points", new Integer(SqlParserSymbols.KW_POINTS)); keywordMap.put("preceding", new Integer(SqlParserSymbols.KW_PRECEDING)); keywordMap.put("prepare_fn", new Integer(SqlParserSymbols.KW_PREPARE_FN)); @@ -216,6 +218,8 @@ import com.cloudera.impala.analysis.SqlParserSymbols; keywordMap.put("when", new Integer(SqlParserSymbols.KW_WHEN)); keywordMap.put("where", new Integer(SqlParserSymbols.KW_WHERE)); keywordMap.put("with", new Integer(SqlParserSymbols.KW_WITH)); + keywordMap.put("with_dist", new Integer(SqlParserSymbols.KW_WITHDIST)); + keywordMap.put("with_k", new Integer(SqlParserSymbols.KW_WITHK)); } // map from token id to token description diff --git a/fe/src/test/resources/hive-log4j.properties b/fe/src/test/resources/hive-log4j.properties new file mode 100644 index 0000000000..ed532f8eee --- /dev/null +++ b/fe/src/test/resources/hive-log4j.properties @@ -0,0 +1,62 @@ +# Define some default values that can be overridden by system properties +hive.root.logger=INFO,DRFA +hive.log.dir=/home/ubuntu/SpatialImpala/cluster_logs/hive +hive.log.file=hive.log + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hive.root.logger}, EventCounter + +# Logging Threshold +log4j.threshhold=WARN + +# +# Daily Rolling File Appender +# + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +#custom logging levels +#log4j.logger.xxx=DEBUG + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter + + +log4j.category.DataNucleus=ERROR,DRFA +log4j.category.Datastore=ERROR,DRFA +log4j.category.Datastore.Schema=ERROR,DRFA +log4j.category.JPOX.Datastore=ERROR,DRFA +log4j.category.JPOX.Plugin=ERROR,DRFA +log4j.category.JPOX.MetaData=ERROR,DRFA +log4j.category.JPOX.Query=ERROR,DRFA +log4j.category.JPOX.General=ERROR,DRFA +log4j.category.JPOX.Enhancer=ERROR,DRFA +log4j.logger.org.apache.hadoop.conf.Configuration=ERROR,DRFA + diff --git a/fe/src/test/resources/sentry-site.xml b/fe/src/test/resources/sentry-site.xml new file mode 100644 index 0000000000..d9c76cde1d --- /dev/null +++ b/fe/src/test/resources/sentry-site.xml @@ -0,0 +1,61 @@ + + + + + + + sentry.service.security.mode + none + + + sentry.service.admin.group + root + + + sentry.service.server.rpc-address + localhost + + + sentry.service.server.rpc-port + 30911 + + + sentry.service.client.server.rpc-address + localhost + + + sentry.service.client.server.rpc-port + 30911 + + + sentry.store.jdbc.url + jdbc:postgresql://localhost:5432/sentry_policy/;create=true + + + sentry.store.jdbc.user + hiveuser + + + sentry.store.jdbc.password + password + + + sentry.verify.schema.version + false + +