Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/thrift/CatalogObjects.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ struct TDataSourceTable {
2: required string init_string
}

// Represents a point
struct TPoint {
1: required double x
2: required double y
}

// Represents a rectangle used in a global index record for spatial tables.
struct TRectangle {
1: required double x1
Expand Down
30 changes: 27 additions & 3 deletions fe/src/main/cup/sql-parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

package com.cloudera.impala.analysis;

import org.gistic.spatialImpala.catalog.Point;
import org.gistic.spatialImpala.catalog.Rectangle;
import org.gistic.spatialImpala.analysis.SpatialPointInclusionStmt;
import org.gistic.spatialImpala.analysis.SpatialKnnStmt;

import com.cloudera.impala.catalog.Type;
import com.cloudera.impala.catalog.ScalarType;
Expand Down Expand Up @@ -242,10 +244,10 @@ terminal
KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL,
KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HAVING, KW_IF, KW_IN, KW_INIT_FN,
KW_INNER, KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO,
KW_INVALIDATE, KW_IS, KW_JOIN, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD,
KW_INVALIDATE, KW_IS, KW_JOIN, KW_KNN, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD,
KW_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET,
KW_ON, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, KW_OVERLAPS, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE,
KW_PARTITION, KW_PARTITIONED, KW_PARTITIONS, KW_POINTS, KW_PRECEDING,
KW_PARTITION, KW_PARTITIONED, KW_PARTITIONS, KW_POINT, KW_POINTS, KW_PRECEDING,
KW_PREPARE_FN, KW_PRODUCED, KW_RANGE, KW_RCFILE, KW_RECTANGLE, KW_REFRESH, KW_REGEXP, KW_RENAME,
KW_REPLACE, KW_RETURNS, KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW,
KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, KW_SERDEPROPERTIES,
Expand All @@ -254,7 +256,7 @@ terminal
KW_TEXTFILE, KW_THEN,
KW_TIMESTAMP, KW_TINYINT, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED,
KW_UNION, KW_UPDATE_FN, KW_USE, KW_USING,
KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH;
KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH, KW_WITHDIST, KW_WITHK;

terminal COLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN, LBRACKET, RBRACKET,
DIVIDE, MOD, ADD, SUBTRACT;
Expand Down Expand Up @@ -445,6 +447,9 @@ nonterminal CreateFunctionStmtBase.OptArg create_function_arg_key;
nonterminal SpatialPointInclusionStmt spatial_point_inclusion_stmt;
nonterminal Rectangle rectangle;
nonterminal NumericLiteral rectangle_arg;
nonterminal SpatialKnnStmt spatial_knn_stmt;
nonterminal Point point;
nonterminal NumericLiteral point_arg;

precedence left KW_OR;
precedence left KW_AND;
Expand Down Expand Up @@ -559,27 +564,46 @@ stmt ::=
{: RESULT = revoke_privilege; :}
| spatial_point_inclusion_stmt:spatial_point_inclusion
{: RESULT = spatial_point_inclusion; :}
| spatial_knn_stmt:spatial_knn
{: RESULT = spatial_knn; :}
;

spatial_point_inclusion_stmt ::=
KW_LOAD KW_POINTS KW_FROM KW_TABLE table_name:tbl KW_OVERLAPS rectangle:rect
{: RESULT = new SpatialPointInclusionStmt(tbl, rect); :}
;

spatial_knn_stmt ::=
KW_LOAD KW_POINTS KW_FROM KW_TABLE table_name:tbl KW_KNN point:p KW_WITHK expr:k KW_WITHDIST order_by_elements:dist
{: RESULT = new SpatialKnnStmt(tbl,p,new LimitElement(k, null), dist); :}
;

rectangle ::=
KW_RECTANGLE LPAREN rectangle_arg:x1 COMMA rectangle_arg:y1 COMMA
rectangle_arg:x2 COMMA rectangle_arg:y2 RPAREN
{: RESULT = new Rectangle(x1.getDoubleValue(), y1.getDoubleValue(),
x2.getDoubleValue(), y2.getDoubleValue()); :}
;

point ::=
KW_POINT LPAREN point_arg:x COMMA point_arg:y RPAREN
{: RESULT = new Point(x.getDoubleValue(), y.getDoubleValue()); :}
;

rectangle_arg ::=
INTEGER_LITERAL:l
{: RESULT = new NumericLiteral(l); :}
| DECIMAL_LITERAL:l
{: RESULT = new NumericLiteral(l); :}
;

point_arg ::=
INTEGER_LITERAL:l
{: RESULT = new NumericLiteral(l); :}
| DECIMAL_LITERAL:l
{: RESULT = new NumericLiteral(l); :}
;

load_stmt ::=
KW_LOAD KW_DATA KW_INPATH STRING_LITERAL:path overwrite_val:overwrite KW_INTO KW_TABLE
table_name:table opt_partition_spec:partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ static public class AnalysisResult {
public boolean isComputeStatsStmt() { return stmt_ instanceof ComputeStatsStmt; }
public boolean isQueryStmt() {
return (stmt_ instanceof QueryStmt
|| stmt_ instanceof SpatialPointInclusionStmt);
|| stmt_ instanceof SpatialPointInclusionStmt
|| stmt_ instanceof SpatialKnnStmt);
}
public boolean isInsertStmt() { return stmt_ instanceof InsertStmt; }
public boolean isDropDbStmt() { return stmt_ instanceof DropDbStmt; }
Expand Down Expand Up @@ -212,6 +213,8 @@ public QueryStmt getQueryStmt() {
return (QueryStmt) stmt_;
else if (stmt_ instanceof SpatialPointInclusionStmt)
return ((SpatialPointInclusionStmt) stmt_).getSelectStmtIfAny();
else if (stmt_ instanceof SpatialKnnStmt)
return ((SpatialKnnStmt) stmt_).getSelectStmtIfAny();
else
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Combination of limit and offset expressions.
*/
class LimitElement {
public class LimitElement {
private final Expr limitExpr_;
private final Expr offsetExpr_;
private long limit_;
Expand Down
137 changes: 137 additions & 0 deletions fe/src/main/java/org/gistic/spatialImpala/analysis/SpatialKnnStmt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2014 GISTIC.

package org.gistic.spatialImpala.analysis;

import org.gistic.spatialImpala.catalog.*;

import com.cloudera.impala.catalog.Table;
import com.cloudera.impala.authorization.Privilege;
import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.analysis.Analyzer;
import com.cloudera.impala.analysis.StatementBase;
import com.cloudera.impala.analysis.SelectStmt;
import com.cloudera.impala.analysis.SelectListItem;
import com.cloudera.impala.analysis.SelectList;
import com.cloudera.impala.analysis.TableName;
import com.cloudera.impala.analysis.TableRef;
import com.cloudera.impala.analysis.SlotRef;
import com.cloudera.impala.analysis.IsNullPredicate;
import com.cloudera.impala.analysis.Expr;
import com.cloudera.impala.analysis.CompoundPredicate;
import com.cloudera.impala.analysis.BinaryPredicate;
import com.cloudera.impala.analysis.IsNullPredicate;
import com.cloudera.impala.analysis.StringLiteral;
import com.cloudera.impala.analysis.NumericLiteral;
import com.cloudera.impala.analysis.LimitElement;
import com.cloudera.impala.analysis.OrderByElement;

import java.util.List;
import java.util.ArrayList;
import java.math.BigDecimal;

/**
* Represents a Spatial Point Inclusion statement
*/
public class SpatialKnnStmt extends StatementBase {
private static final String TABLE_NOT_SPATIAL_ERROR_MSG = "Table is not a spatial table.";
private static final String TAG = "tag";
private static final String X = "x";
private static final String Y = "y";
private TableName tableName_;
private final Point p_;
private final LimitElement k_;
private final ArrayList<OrderByElement> dist_;

// Initialized during analysis.
private SelectStmt selectStmt_;

public SpatialKnnStmt(TableName tblName, Point p, LimitElement k, ArrayList<OrderByElement> dist) {
this.tableName_ = tblName;
this.p_ = p;
this.k_ = k;
this.dist_=dist;
this.selectStmt_ = null;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
// Getting table and checking for existence.
Table table;
if (!tableName_.isFullyQualified()) {
tableName_ = new TableName(analyzer.getDefaultDb(),
tableName_.getTbl());
}
table = analyzer.getTable(tableName_, Privilege.SELECT);

// Table should be an instance of a Spatial table.
if (!(table instanceof SpatialHdfsTable))
throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG);

SpatialHdfsTable spatialTable = (SpatialHdfsTable) table;

// Global index shouldn't be null.
GlobalIndex globalIndex = spatialTable.getGlobalIndexIfAny();
if (globalIndex == null)
throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG
+ " : Table doesn't have global indexes.");

List<String> columnNames = spatialTable.getColumnNames();
if (!(columnNames.contains(TAG) && columnNames.contains(X) && columnNames
.contains(Y)))
throw new AnalysisException(TABLE_NOT_SPATIAL_ERROR_MSG
+ " : Table doesn't have the required columns.");

List<GlobalIndexRecord> globalIndexes = globalIndex
.getGIsforKnn(p_);

// Preparing data for SelectStmt.
List<TableRef> tableRefs = new ArrayList<TableRef>();
tableRefs.add(new TableRef(tableName_, null));

List<SelectListItem> items = new ArrayList<SelectListItem>();
items.add(new SelectListItem(new SlotRef(tableName_, X), null));
items.add(new SelectListItem(new SlotRef(tableName_, Y), null));

selectStmt_ = new SelectStmt(new SelectList(items), tableRefs,
createWherePredicate(globalIndexes), null, null, dist_, k_);

selectStmt_.analyze(analyzer);
}

@Override
public String toSql() {
return "load points from table " + tableName_.getTbl()
+ " knn " + p_.toString() + "with_k " + k_ + ";";
}

public SelectStmt getSelectStmtIfAny() {
return selectStmt_;
}

private Expr createWherePredicate(List<GlobalIndexRecord> globalIndexes) {
SlotRef globalIndexSlotRef = new SlotRef(tableName_, TAG);
if (globalIndexes == null || globalIndexes.size() == 0) {
return new IsNullPredicate(globalIndexSlotRef, false);
}

// Creating Where predicate.
Expr wherePredicate = null;

// Create Global Index predicate.
wherePredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ,
globalIndexSlotRef, new StringLiteral(globalIndexes.get(0)
.getTag()));

for (int i = 1; i < globalIndexes.size(); i++) {
Expr globalIndexPredicate = new BinaryPredicate(
BinaryPredicate.Operator.EQ, globalIndexSlotRef,
new StringLiteral(globalIndexes.get(i).getTag()));

wherePredicate = new CompoundPredicate(
CompoundPredicate.Operator.OR, wherePredicate,
globalIndexPredicate);
}

return wherePredicate;
}
}
18 changes: 16 additions & 2 deletions fe/src/main/java/org/gistic/spatialImpala/catalog/GlobalIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,21 @@ public List<GlobalIndexRecord> getGIsforPoint(int x, int y) {
}
return globalIndexes;
}


public List<GlobalIndexRecord> getGIsforKnn(Point pt) {
double maxdist = 0;
for (GlobalIndexRecord gIRecord : globalIndexMap.values()) {
if (gIRecord.getMBR().includesPoint(pt.getX(), pt.getY()))
maxdist = Math.max(maxdist, gIRecord.getMBR().getMaxDist(pt.getX(), pt.getY()));
}
List<GlobalIndexRecord> globalIndexes = new ArrayList<GlobalIndexRecord>();
for (GlobalIndexRecord gIRecord : globalIndexMap.values()) {
if (gIRecord.getMBR().getMinDist(pt.getX(), pt.getY()) < maxdist )
globalIndexes.add(gIRecord);
}
return globalIndexes;
}

public List<GlobalIndexRecord> getGIsforRectangle(Rectangle rect) {
List<GlobalIndexRecord> globalIndexes = new ArrayList<GlobalIndexRecord>();
for (GlobalIndexRecord gIRecord : globalIndexMap.values()) {
Expand Down Expand Up @@ -161,4 +175,4 @@ public static GlobalIndex fromThrift(TGlobalIndex tGlobalIndex) {
}
return new GlobalIndex(tGlobalIndex.getTbl_name(), gIMap);
}
}
}
36 changes: 36 additions & 0 deletions fe/src/main/java/org/gistic/spatialImpala/catalog/Point.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2014 GISTIC.

package org.gistic.spatialImpala.catalog;

import com.cloudera.impala.thrift.TPoint;

public class Point {
private double x;
private double y;

public Point(double x, double y) {
this.x = x;
this.y = y;
}

public TPoint toThrift() {
return new TPoint(this.x, this.y);
}

public static Point fromThrift(TPoint p) {
return new Point(p.getX(), p.getY());
}

public double getX() {
return x;
}

public double getY() {
return y;
}

@Override
public String toString() {
return "(" + x + ", " + y + ")";
}
}
16 changes: 14 additions & 2 deletions fe/src/main/java/org/gistic/spatialImpala/catalog/Rectangle.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,21 @@ public Rectangle(double x1, double y1, double x2, double y2) {
this.y2 = y2;
}

public boolean includesPoint(int x, int y) {
public boolean includesPoint(double x, double y) {
return (x >= x1) && (x <= x2) && (y >= y1) && (y <= y2);
}

public double getMaxDist(double x, double y) {
return Math.max(Math.max(dist(x,y,x1,y1),dist(x,y,x2,y1)),Math.max(dist(x,y,x1,y2),dist(x,y,x2,y2)));
}

public double getMinDist(double x, double y) {
return Math.min(Math.min(dist(x,y,x1,y1),dist(x,y,x2,y1)),Math.min(dist(x,y,x1,y2),dist(x,y,x2,y2)));
}

private double dist(double x1, double y1, double X2, double y2) {
return Math.sqrt( (x1 - x2) * (x1 - x2) + (y1 - y2) * (y1 - y2) );
}

public boolean overlaps(Rectangle rect) {
if (this.x1 > rect.x2 || this.x2 < rect.x1)
Expand Down Expand Up @@ -60,4 +72,4 @@ public String toString() {
return "(" + x1 + ", " + y1 + ", "
+ x2 + ", " + y2 + ")";
}
}
}
4 changes: 4 additions & 0 deletions fe/src/main/jflex/sql-scanner.flex
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ import com.cloudera.impala.analysis.SqlParserSymbols;
keywordMap.put("invalidate", new Integer(SqlParserSymbols.KW_INVALIDATE));
keywordMap.put("is", new Integer(SqlParserSymbols.KW_IS));
keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN));
keywordMap.put("knn", new Integer(SqlParserSymbols.KW_KNN));
keywordMap.put("last", new Integer(SqlParserSymbols.KW_LAST));
keywordMap.put("left", new Integer(SqlParserSymbols.KW_LEFT));
keywordMap.put("like", new Integer(SqlParserSymbols.KW_LIKE));
Expand Down Expand Up @@ -158,6 +159,7 @@ import com.cloudera.impala.analysis.SqlParserSymbols;
keywordMap.put("partition", new Integer(SqlParserSymbols.KW_PARTITION));
keywordMap.put("partitioned", new Integer(SqlParserSymbols.KW_PARTITIONED));
keywordMap.put("partitions", new Integer(SqlParserSymbols.KW_PARTITIONS));
keywordMap.put("point", new Integer(SqlParserSymbols.KW_POINT));
keywordMap.put("points", new Integer(SqlParserSymbols.KW_POINTS));
keywordMap.put("preceding", new Integer(SqlParserSymbols.KW_PRECEDING));
keywordMap.put("prepare_fn", new Integer(SqlParserSymbols.KW_PREPARE_FN));
Expand Down Expand Up @@ -216,6 +218,8 @@ import com.cloudera.impala.analysis.SqlParserSymbols;
keywordMap.put("when", new Integer(SqlParserSymbols.KW_WHEN));
keywordMap.put("where", new Integer(SqlParserSymbols.KW_WHERE));
keywordMap.put("with", new Integer(SqlParserSymbols.KW_WITH));
keywordMap.put("with_dist", new Integer(SqlParserSymbols.KW_WITHDIST));
keywordMap.put("with_k", new Integer(SqlParserSymbols.KW_WITHK));
}

// map from token id to token description
Expand Down
Loading