Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
},
"devDependencies": {
"@effect/eslint-plugin": "^0.3.2",
"@effect/language-service": "^0.62.0",
"@effect/language-service": "^0.62.4",
"@effect/platform-node": "^0.104.0",
"@effect/vitest": "^0.27.0",
"@eslint/js": "^9.39.1",
"@types/node": "^25.0.0",
"@vitest/coverage-v8": "^4.0.15",
"@vitest/ui": "^4.0.15",
"effect": "^3.19.11",
"eslint": "^9.39.1",
"@eslint/js": "^9.39.2",
"@types/node": "^25.0.3",
"@vitest/coverage-v8": "^4.0.16",
"@vitest/ui": "^4.0.16",
"effect": "^3.19.13",
"eslint": "^9.39.2",
"eslint-import-resolver-typescript": "^4.4.4",
"eslint-plugin-import-x": "^4.16.1",
"eslint-plugin-simple-import-sort": "^12.1.1",
Expand All @@ -28,9 +29,9 @@
"globals": "^16.5.0",
"ts-patch": "^3.3.0",
"typescript": "^5.9.3",
"typescript-eslint": "^8.49.0",
"vite-tsconfig-paths": "^5.1.4",
"vitest": "^4.0.15",
"typescript-eslint": "^8.50.0",
"vite-tsconfig-paths": "^6.0.3",
"vitest": "^4.0.16",
"vitest-mock-express": "^2.2.0"
}
}
11 changes: 8 additions & 3 deletions packages/amp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@
"@bufbuild/protobuf": "^2.10.1",
"@connectrpc/connect": "^2.1.1",
"@connectrpc/connect-node": "^2.1.1",
"@effect/platform": "^0.94.0",
"effect": "^3.19.11"
},
"devDependencies": {
"@bufbuild/buf": "^1.61.0",
"@bufbuild/protobuf": "^2.10.1",
"@bufbuild/protoc-gen-es": "^2.10.1",
"@bufbuild/protobuf": "^2.10.2",
"@bufbuild/protoc-gen-es": "^2.10.2",
"@connectrpc/connect": "^2.1.1",
"@connectrpc/connect-node": "^2.1.1",
"effect": "^3.19.11"
"@effect/platform": "^0.94.0",
"effect": "^3.19.13"
},
"dependencies": {
"viem": "^2.43.2"
}
}
113 changes: 92 additions & 21 deletions packages/amp/src/ArrowFlight.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,93 @@
import { create, toBinary } from "@bufbuild/protobuf"
import { anyPack, AnySchema } from "@bufbuild/protobuf/wkt"
import { type Client, createClient, type Transport as ConnectTransport } from "@connectrpc/connect"
import {
type Client,
createClient,
createContextKey,
createContextValues,
type Interceptor,
type Transport as ConnectTransport
} from "@connectrpc/connect"
import * as Arr from "effect/Array"
import * as Console from "effect/Console"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Redacted from "effect/Redacted"
import * as Schema from "effect/Schema"
import * as Stream from "effect/Stream"
import { Auth } from "./Auth.ts"
import { decodeDictionaryBatch, decodeRecordBatch, DictionaryRegistry } from "./internal/arrow-flight-ipc/Decoder.ts"
import { recordBatchToJson } from "./internal/arrow-flight-ipc/Json.ts"
import { readColumnValues } from "./internal/arrow-flight-ipc/Readers.ts"
import { parseDictionaryBatch, parseRecordBatch } from "./internal/arrow-flight-ipc/RecordBatch.ts"
import { type ArrowSchema, getMessageType, MessageHeaderType, parseSchema } from "./internal/arrow-flight-ipc/Schema.ts"
import type { AuthInfo } from "./Models.ts"
import { FlightDescriptor_DescriptorType, FlightDescriptorSchema, FlightService } from "./Protobuf/Flight_pb.ts"
import { CommandStatementQuerySchema } from "./Protobuf/FlightSql_pb.ts"

// =============================================================================
// Connect RPC Transport
// =============================================================================

/**
* A service which abstracts the underlying transport for a given client.
*
* A transport implements a protocol, such as Connect or gRPC-web, and allows
* for the concrete clients to be independent of the protocol.
*/
export class Transport extends Context.Tag("@edgeandnode/amp/Transport")<
Transport,
ConnectTransport
>() {}

/**
* A service which abstracts the set of interceptors that are passed to a given
* transport.
*
* An interceptor can add logic to clients or servers, similar to the decorators
* or middleware you may have seen in other libraries. Interceptors may
* mutate the request and response, catch errors and retry/recover, emit
* logs, or do nearly everything else.
*/
export class Interceptors extends Context.Reference<Interceptors>()(
"Amp/ArrowFlight/ConnectRPC/Interceptors",
{ defaultValue: () => Arr.empty<Interceptor>() }
) {}

const AuthInfoContextKey = createContextKey<AuthInfo | undefined>(
undefined,
{ description: "Authentication information obtained from the Amp auth server" }
)

/**
* A layer which will add an interceptor to the configured set of `Interceptors`
* which attempts to read authentication information from the Connect context
* values.
*
* If authentication information is found, the interceptor will add an
* `"Authorization"` header to the request containing a bearer token with the
* value of the authentication information access token.
*/
export const layerInterceptorBearerAuth = Layer.effectContext(
Effect.gen(function*() {
const interceptors = yield* Interceptors

const interceptor: Interceptor = (next) => (request) => {
const authInfo = request.contextValues.get(AuthInfoContextKey)

if (authInfo !== undefined) {
const accessToken = Redacted.value(authInfo.accessToken)
request.header.append("Authorization", `Bearer ${accessToken}`)
}
return next(request)
}

return Context.make(Interceptors, Arr.append(interceptors, interceptor))
})
)

// =============================================================================
// Errors
// =============================================================================
Expand All @@ -24,7 +97,7 @@ import { CommandStatementQuerySchema } from "./Protobuf/FlightSql_pb.ts"
* Represents the possible errors that can occur when executing an Arrow Flight
* query.
*/
export type ArrowFlightQueryError =
export type ArrowFlightError =
| RpcError
| NoEndpointsError
| MultipleEndpointsError
Expand Down Expand Up @@ -142,17 +215,30 @@ export class ArrowFlight extends Context.Tag("@edgeandnode/amp/ArrowFlight")<Arr
*/
readonly client: Client<typeof FlightService>

readonly query: (query: string) => Effect.Effect<unknown, ArrowFlightQueryError>
/**
* Executes an Arrow Flight SQL query and returns
*/
readonly query: (sql: string) => Effect.Effect<unknown, ArrowFlightError>
}>() {}

const make = Effect.gen(function*() {
const auth = yield* Effect.serviceOption(Auth)
const transport = yield* Transport
const client = createClient(FlightService, transport)

/**
* Execute a SQL query and return a stream of rows.
*/
const query = Effect.fn("ArrowFlight.request")(function*(query: string) {
// Setup the query context with authentication information, if available
const contextValues = createContextValues()
const authInfo = Option.isSome(auth)
? yield* auth.value.getCachedAuthInfo
: Option.none<AuthInfo>()
if (Option.isSome(authInfo)) {
contextValues.set(AuthInfoContextKey, authInfo.value)
}

const cmd = create(CommandStatementQuerySchema, { query })
const any = anyPack(CommandStatementQuerySchema, cmd)
const desc = create(FlightDescriptorSchema, {
Expand All @@ -161,7 +247,7 @@ const make = Effect.gen(function*() {
})

const flightInfo = yield* Effect.tryPromise({
try: (signal) => client.getFlightInfo(desc, { signal }),
try: (signal) => client.getFlightInfo(desc, { signal, contextValues }),
catch: (cause) => new RpcError({ cause, method: "getFlightInfo" })
})

Expand All @@ -183,7 +269,7 @@ const make = Effect.gen(function*() {
(controller) => Effect.sync(() => controller.abort())
)
return Stream.fromAsyncIterable(
client.doGet(ticket, { signal: controller.signal }),
client.doGet(ticket, { signal: controller.signal, contextValues }),
(cause) => new RpcError({ cause, method: "doGet" })
)
}))
Expand Down Expand Up @@ -236,19 +322,4 @@ const make = Effect.gen(function*() {
* A layer which constructs a concrete implementation of an `ArrowFlight`
* service and depends upon some implementation of a `Transport`.
*/
export const layer: Layer.Layer<ArrowFlight, ArrowFlightQueryError, Transport> = Layer.effect(ArrowFlight, make)

// =============================================================================
// Transport Service
// =============================================================================

/**
* A service which abstracts the underlying transport for a given client.
*
* A transport implements a protocol, such as Connect or gRPC-web, and allows
* for the concrete clients to be independent of the protocol.
*/
export class Transport extends Context.Tag("@edgeandnode/amp/Transport")<
Transport,
ConnectTransport
>() {}
export const layer: Layer.Layer<ArrowFlight, ArrowFlightError, Transport> = Layer.effect(ArrowFlight, make)
14 changes: 12 additions & 2 deletions packages/amp/src/ArrowFlight/Node.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { createGrpcTransport, type GrpcTransportOptions } from "@connectrpc/connect-node"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import { Transport } from "../ArrowFlight.ts"
import { Interceptors, Transport } from "../ArrowFlight.ts"

/**
* Create a `Transport` for the gRPC protocol using the Node.js `http2` module.
*/
export const layerTransportGrpc = (options: GrpcTransportOptions): Layer.Layer<Transport> =>
Layer.sync(Transport, () => createGrpcTransport(options))
Layer.effect(
Transport,
Effect.gen(function*() {
const interceptors = yield* Interceptors
return createGrpcTransport({
...options,
interceptors: [...(options.interceptors ?? []), ...interceptors]
})
})
)
Loading