Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a15f787
add transform context type
fupelaqu Jan 12, 2026
bc70efd
add standard join model
fupelaqu Jan 12, 2026
140f794
add support for enrichment
fupelaqu Jan 12, 2026
3faf30b
to fix compilation bugs with scala 2.12
fupelaqu Jan 12, 2026
2b7f66b
add transform model
fupelaqu Jan 12, 2026
fdcf3dd
replace toJson: Map by node: JsonNode for transform model
fupelaqu Jan 13, 2026
cbd6e5d
fix SQL type for COUNT aggr
fupelaqu Jan 16, 2026
ac43961
update all identifiers within functions - update dependencies
fupelaqu Jan 16, 2026
b433a68
add naming utils, add isObject flag
fupelaqu Jan 16, 2026
8ccd0d3
use painless transform param if painless context is transform
fupelaqu Jan 16, 2026
57252da
update transform param name, update painless context creation using i…
fupelaqu Jan 16, 2026
082e1f2
fix transform param name
fupelaqu Jan 16, 2026
f0b5320
compute bucket selector script within having, add LatestValueTransfor…
fupelaqu Jan 16, 2026
1824bd2
init licensing module
fupelaqu Jan 16, 2026
e8a9b67
fix tranformCheckNotNull
fupelaqu Jan 16, 2026
1800957
add schema lineage
fupelaqu Jan 16, 2026
0c492be
implements implicit Criteria to JsonNode
fupelaqu Jan 16, 2026
11915fb
Merge branch 'main' into feature/licensing
fupelaqu Jan 16, 2026
e229b5e
fix parser specifications
fupelaqu Jan 17, 2026
9ff1b6c
implements extension registry
fupelaqu Jan 17, 2026
e9eb615
Merge remote-tracking branch 'origin/feature/licensing' into feature/…
fupelaqu Jan 17, 2026
a502899
update version
fupelaqu Jan 17, 2026
763c955
update version, add enrich policy and transform apis, add support for…
fupelaqu Jan 22, 2026
3914f46
add support for refresh materialized view ddl, add support for show m…
fupelaqu Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package app.softnetwork.elastic.sql.bridge

import app.softnetwork.elastic.sql.PainlessContext
import app.softnetwork.elastic.sql.{PainlessContext, PainlessContextType}
import app.softnetwork.elastic.sql.`type`.SQLTemporal
import app.softnetwork.elastic.sql.query.{
Asc,
Expand Down Expand Up @@ -100,7 +100,10 @@ object ElasticAggregation {
having: Option[Criteria],
bucketsDirection: Map[String, SortOrder],
allAggregations: Map[String, SQLAggregation]
)(implicit timestamp: Long): ElasticAggregation = {
)(implicit
timestamp: Long,
contextType: PainlessContextType
): ElasticAggregation = {
import sqlAgg._
val sourceField = identifier.path

Expand Down Expand Up @@ -153,14 +156,14 @@ object ElasticAggregation {
buildScript: (String, Script) => Aggregation
): Aggregation = {
if (transformFuncs.nonEmpty) {
val context = PainlessContext()
val context = PainlessContext(context = contextType)
val scriptSrc = identifier.painless(Some(context))
val script = now(Script(s"$context$scriptSrc").lang("painless"))
buildScript(aggName, script)
} else {
aggType match {
case th: WindowFunction if th.shouldBeScripted =>
val context = PainlessContext()
val context = PainlessContext(context = contextType)
val scriptSrc = th.identifier.painless(Some(context))
val script = now(Script(s"$context$scriptSrc").lang("painless"))
buildScript(aggName, script)
Expand Down Expand Up @@ -348,7 +351,10 @@ object ElasticAggregation {
having: Option[Criteria],
nested: Option[NestedElement],
allElasticAggregations: Seq[ElasticAggregation]
)(implicit timestamp: Long): Seq[Aggregation] = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Seq[Aggregation] = {
for (tree <- buckets) yield {
val treeNodes =
tree.sortBy(_.level).reverse.foldLeft(Seq.empty[NodeAggregation]) { (current, node) =>
Expand All @@ -364,7 +370,7 @@ object ElasticAggregation {

val aggScript =
if (!bucket.isBucketScript && bucket.shouldBeScripted) {
val context = PainlessContext()
val context = PainlessContext(context = contextType)
val painless = bucket.painless(Some(context))
Some(now(Script(s"$context$painless").lang("painless")))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package app.softnetwork.elastic.sql.bridge

import app.softnetwork.elastic.sql.PainlessContextType
import app.softnetwork.elastic.sql.operator.AND
import app.softnetwork.elastic.sql.query.{
BetweenExpr,
Expand All @@ -36,16 +37,16 @@ import app.softnetwork.elastic.sql.query.{
Predicate
}
import com.sksamuel.elastic4s.ElasticApi._
import com.sksamuel.elastic4s.requests.common.FetchSourceContext
import com.sksamuel.elastic4s.requests.searches.queries.{InnerHit, Query}

import scala.annotation.tailrec

case class ElasticBridge(filter: ElasticFilter) {
def query(
innerHitsNames: Set[String] = Set.empty,
currentQuery: Option[ElasticBoolQuery]
)(implicit timestamp: Long): Query = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Query = {
filter match {
case boolQuery: ElasticBoolQuery =>
import boolQuery._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package app.softnetwork.elastic.sql.bridge

import app.softnetwork.elastic.sql.PainlessContextType
import app.softnetwork.elastic.sql.query.Criteria
import com.sksamuel.elastic4s.requests.searches.queries.Query

case class ElasticCriteria(criteria: Criteria) {

def asQuery(group: Boolean = true, innerHitsNames: Set[String] = Set.empty)(implicit
timestamp: Long
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Query = {
val query = criteria.boolQuery.copy(group = group)
query
Expand Down
111 changes: 93 additions & 18 deletions bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import app.softnetwork.elastic.sql.`type`.{
SQLTemporal,
SQLVarchar
}
import app.softnetwork.elastic.sql.config.ElasticSqlConfig
import app.softnetwork.elastic.sql.function.aggregate.COUNT
import app.softnetwork.elastic.sql.function.geo.{Distance, Meters}
import app.softnetwork.elastic.sql.operator._
import app.softnetwork.elastic.sql.query._
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import com.sksamuel.elastic4s.ElasticApi
import com.sksamuel.elastic4s.ElasticApi._
import com.sksamuel.elastic4s.requests.common.FetchSourceContext
import com.sksamuel.elastic4s.json.JacksonBuilder
import com.sksamuel.elastic4s.requests.script.Script
import com.sksamuel.elastic4s.requests.script.ScriptType.Source
import com.sksamuel.elastic4s.requests.searches.aggs.{
Expand All @@ -51,17 +54,30 @@ import scala.language.implicitConversions

package object bridge {

def now(script: Script)(implicit timestamp: Long): Script = {
lazy val sqlConfig: ElasticSqlConfig = ElasticSqlConfig()

def now(script: Script)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Script = {
if (!script.script.contains("params.__now__")) {
return script
}
script.param("__now__", timestamp)
contextType match {
case PainlessContextType.Query => script.param("__now__", timestamp)
case PainlessContextType.Transform =>
script.param("__now__", sqlConfig.transformLastUpdatedColumnName)
case _ => script
}
}

implicit def requestToNestedFilterAggregation(
request: SingleSearch,
innerHitsName: String
)(implicit timestamp: Long): Option[FilterAggregation] = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Option[FilterAggregation] = {
val having: Option[Query] =
request.having.flatMap(_.criteria) match {
case Some(f) =>
Expand Down Expand Up @@ -137,7 +153,10 @@ package object bridge {

implicit def requestToFilterAggregation(
request: SingleSearch
)(implicit timestamp: Long): Option[FilterAggregation] =
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Option[FilterAggregation] =
request.having.flatMap(_.criteria) match {
case Some(f) =>
val boolQuery = Option(ElasticBoolQuery(group = true))
Expand All @@ -155,7 +174,10 @@ package object bridge {
implicit def requestToRootAggregations(
request: SingleSearch,
aggregations: Seq[ElasticAggregation]
)(implicit timestamp: Long): Seq[AbstractAggregation] = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Seq[AbstractAggregation] = {
val notNestedAggregations = aggregations.filterNot(_.nested)

val notNestedBuckets = request.bucketTree.filterNot(_.bucket.nested)
Expand Down Expand Up @@ -207,7 +229,10 @@ package object bridge {
implicit def requestToScopedAggregations(
request: SingleSearch,
aggregations: Seq[ElasticAggregation]
)(implicit timestamp: Long): Seq[NestedAggregation] = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Seq[NestedAggregation] = {
// Group nested aggregations by their nested path
val nestedAggregations: Map[String, Seq[ElasticAggregation]] = aggregations
.filter(_.nested)
Expand Down Expand Up @@ -413,7 +438,8 @@ package object bridge {
}

implicit def requestToElasticSearchRequest(request: SingleSearch)(implicit
timestamp: Long
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): ElasticSearchRequest =
ElasticSearchRequest(
request.sql,
Expand All @@ -431,7 +457,10 @@ package object bridge {

implicit def requestToSearchRequest(
request: SingleSearch
)(implicit timestamp: Long): SearchRequest = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): SearchRequest = {
import request._

val aggregations = request.aggregates.map(
Expand Down Expand Up @@ -491,7 +520,7 @@ package object bridge {
case Nil => _search
case _ =>
_search scriptfields scriptFields.map { field =>
val context = PainlessContext()
val context = PainlessContext(context = contextType)
val script = field.painless(Some(context))
scriptField(
field.scriptName,
Expand All @@ -512,7 +541,7 @@ package object bridge {
case Some(o) if aggregates.isEmpty && buckets.isEmpty =>
_search sortBy o.sorts.map { sort =>
if (sort.isScriptSort) {
val context = PainlessContext()
val context = PainlessContext(context = contextType)
val painless = sort.field.painless(Some(context))
val painlessScript = s"$context$painless"
val script =
Expand Down Expand Up @@ -571,7 +600,10 @@ package object bridge {

implicit def requestToMultiSearchRequest(
request: MultiSearch
)(implicit timestamp: Long): MultiSearchRequest = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): MultiSearchRequest = {
MultiSearchRequest(
request.requests.map(implicitly[SearchRequest](_))
)
Expand All @@ -582,7 +614,10 @@ package object bridge {
doubleOp: Double => A
): A = n.toEither.fold(longOp, doubleOp)

implicit def expressionToQuery(expression: GenericExpression)(implicit timestamp: Long): Query = {
implicit def expressionToQuery(expression: GenericExpression)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Query = {
import expression._
if (isAggregation)
return matchAllQuery()
Expand All @@ -592,7 +627,7 @@ package object bridge {
case _ => true
}))
) {
val context = PainlessContext()
val context = PainlessContext(context = contextType)
val script = painless(Some(context))
return scriptQuery(
now(Script(script = s"$context$script").lang("painless").scriptType("source"))
Expand Down Expand Up @@ -810,7 +845,7 @@ package object bridge {
case NE | DIFF => not(rangeQuery(identifier.name) gte script lte script)
}
case _ =>
val context = PainlessContext()
val context = PainlessContext(context = contextType)
val script = painless(Some(context))
scriptQuery(
now(
Expand All @@ -821,7 +856,7 @@ package object bridge {
)
}
case _ =>
val context = PainlessContext()
val context = PainlessContext(context = contextType)
val script = painless(Some(context))
scriptQuery(
now(
Expand Down Expand Up @@ -884,7 +919,10 @@ package object bridge {

implicit def betweenToQuery(
between: BetweenExpr
)(implicit timestamp: Long): Query = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Query = {
import between._
// Geo distance special case
identifier.functions.headOption match {
Expand Down Expand Up @@ -1007,6 +1045,40 @@ package object bridge {
)
}

implicit def queryToJson(
query: Query
): JsonNode = {
JacksonBuilder.toNode(
SearchBodyBuilderFn(
ElasticApi.search("") query {
query
}
).value
) match {
case Left(node: JsonNode) =>
if (node.has("query")) {
node.get("query")
} else {
node
}
case Right(_) => NullNode.instance
}
}

implicit def criteriaToQuery(criteria: Criteria)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Query = {
ElasticCriteria(criteria).asQuery()
}

implicit def criteriaToNode(criteria: Criteria)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): JsonNode = {
queryToJson(criteriaToQuery(criteria))
}

implicit def filterToQuery(
filter: ElasticFilter
): ElasticBridge = {
Expand All @@ -1015,7 +1087,10 @@ package object bridge {

implicit def sqlQueryToAggregations(
query: SelectStatement
)(implicit timestamp: Long): Seq[ElasticAggregation] = {
)(implicit
timestamp: Long,
contextType: PainlessContextType = PainlessContextType.Query
): Seq[ElasticAggregation] = {
import query._
statement
.map {
Expand Down
17 changes: 15 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import app.softnetwork.Publish
import scala.collection.Seq
import SoftClient4es.*
import app.softnetwork.*
import sbt.Def
import sbtbuildinfo.BuildInfoKeys.buildInfoObject

Expand All @@ -19,7 +20,7 @@ ThisBuild / organization := "app.softnetwork"

name := "softclient4es"

ThisBuild / version := "0.15.0"
ThisBuild / version := "0.16-SNAPSHOT"

ThisBuild / scalaVersion := scala213

Expand Down Expand Up @@ -99,6 +100,14 @@ ThisBuild / libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml"

Test / parallelExecution := false

lazy val licensing = project
.in(file("licensing"))
.configs(IntegrationTest)
.settings(
Defaults.itSettings,
moduleSettings
)

lazy val sql = project
.in(file("sql"))
.configs(IntegrationTest)
Expand Down Expand Up @@ -142,6 +151,9 @@ lazy val core = project
.dependsOn(
macros % "compile->compile;test->test;it->it"
)
.dependsOn(
licensing % "compile->compile;test->test;it->it"
)

lazy val persistence = project
.in(file("persistence"))
Expand Down Expand Up @@ -461,6 +473,7 @@ lazy val root = project
crossScalaVersions := Nil
)
.aggregate(
licensing,
sql,
bridge,
macros,
Expand Down
Loading