-
Notifications
You must be signed in to change notification settings - Fork 6
Add websocket handler #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| import type { | ||
| APIGatewayProxyWebsocketEventV2, | ||
| APIGatewayProxyWebsocketHandlerV2, | ||
| Context, | ||
| } from 'aws-lambda'; | ||
| import { stringifyError } from '../utils'; | ||
| import { getLogger } from '../utils/logger'; | ||
| import { HandlerPluginBase } from './base'; | ||
| import { | ||
| WebSocketHandler, | ||
| WebSocketHandlerAuxBase, | ||
| WebSocketHandlerRequest, | ||
| WebSocketHandlerResponse, | ||
| } from './websocketBase'; | ||
|
|
||
| const logger = getLogger(__filename); | ||
|
|
||
| type WebSocketDelegator = () => Promise<WebSocketHandlerResponse | undefined>; | ||
|
|
||
| class WebSocketHandlerMiddleware<A extends WebSocketHandlerAuxBase> { | ||
| public auxPromise: Promise<A>; | ||
| public plugins: Array<HandlerPluginBase<any>>; | ||
|
|
||
| constructor(plugins: Array<HandlerPluginBase<any>>) { | ||
| this.plugins = plugins; | ||
| this.auxPromise = this.createAuxPromise(); | ||
| } | ||
|
|
||
| private createAuxPromise = (): Promise<A> => { | ||
| return !this.plugins || this.plugins.length === 0 | ||
| ? Promise.resolve({} as A) // tslint:disable-line | ||
| : Promise.all( | ||
| this.plugins.map((plugin) => { | ||
| const maybePromise = plugin.create(); | ||
| return maybePromise instanceof Promise | ||
| ? maybePromise | ||
| : Promise.resolve(maybePromise); | ||
| }), | ||
| ).then( | ||
| (auxes) => | ||
| auxes.reduce((all, each) => ({ ...all, ...each }), {}) as A, | ||
| ); | ||
| }; | ||
| } | ||
|
|
||
| class WebSocketHandlerProxy<A extends WebSocketHandlerAuxBase> { | ||
| private request: WebSocketHandlerRequest; | ||
| private aux: A; | ||
| private result: WebSocketHandlerResponse; | ||
|
|
||
| public constructor(event: APIGatewayProxyWebsocketEventV2, context: Context) { | ||
| logger.stupid(`WebSocket event`, event); | ||
| this.request = new WebSocketHandlerRequest(event, context); | ||
| this.aux = {} as A; // tslint:disable-line | ||
| this.result = { statusCode: 200 }; | ||
| } | ||
|
|
||
| public call = async ( | ||
| middleware: WebSocketHandlerMiddleware<A>, | ||
| handler: WebSocketHandler<A>, | ||
| ): Promise<WebSocketHandlerResponse> => { | ||
| try { | ||
| this.aux = await middleware.auxPromise; | ||
| } catch (error) { | ||
| logger.error( | ||
| `Error while initializing plugins' aux: ${stringifyError(error)}`, | ||
| ); | ||
| return { | ||
| statusCode: 500, | ||
| body: JSON.stringify( | ||
| error instanceof Error ? { error: error.message } : error, | ||
| ), | ||
| }; | ||
| } | ||
|
|
||
| const actualHandler = [this.generateHandlerDelegator(handler)]; | ||
| const beginHandlers = middleware.plugins.map((plugin) => | ||
| this.generatePluginDelegator(plugin.begin), | ||
| ); | ||
| const endHandlers = middleware.plugins.map((plugin) => | ||
| this.generatePluginDelegator(plugin.end), | ||
| ); | ||
| const errorHandlers = middleware.plugins.map((plugin) => | ||
| this.generatePluginDelegator(plugin.error), | ||
| ); | ||
|
|
||
| const iterate = async (handlers: WebSocketDelegator[]) => | ||
| Promise.all(handlers.map((each) => this.safeCall(each, errorHandlers))); | ||
|
|
||
| const results = [ | ||
| ...(await iterate(beginHandlers)), | ||
| ...(await iterate(actualHandler)), | ||
| ...(await iterate(endHandlers)), | ||
| ].filter((x) => x); | ||
|
|
||
| // In test phase, throws any exception if there was. | ||
| if (process.env.NODE_ENV === 'test') { | ||
| for (const each of results) { | ||
| if (each instanceof Error) { | ||
| logger.error(`Error occurred: ${stringifyError(each)}`); | ||
| throw each; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| results.forEach((result) => | ||
| logger.silly(`WebSocket middleware result: ${JSON.stringify(result)}`), | ||
| ); | ||
|
|
||
| return this.result; | ||
| }; | ||
|
|
||
| private safeCall = async ( | ||
| delegator: WebSocketDelegator, | ||
| errorHandlers: WebSocketDelegator[], | ||
| ) => { | ||
| try { | ||
| const result = await delegator(); | ||
| return result; | ||
| } catch (error) { | ||
| const handled = await this.handleError(error, errorHandlers); | ||
| return handled; | ||
| } | ||
| }; | ||
|
|
||
| private generateHandlerDelegator = | ||
| (handler: WebSocketHandler<A>): WebSocketDelegator => | ||
| async () => { | ||
| const maybePromise = handler({ | ||
| request: this.request, | ||
| response: undefined, // WebSocket doesn't use response | ||
| aux: this.aux, | ||
| }); | ||
| const result = | ||
| maybePromise instanceof Promise ? await maybePromise : maybePromise; | ||
| logger.stupid(`WebSocket handler result`, result); | ||
|
|
||
| if (result) { | ||
| this.result = result; | ||
| } | ||
| return result; | ||
| }; | ||
|
|
||
| private generatePluginDelegator = | ||
| (pluginCallback: (context: any) => any): WebSocketDelegator => | ||
| async () => { | ||
| const maybePromise = pluginCallback({ | ||
| request: this.request, | ||
| response: undefined, // WebSocket doesn't use response (for HTTP plugin compatibility) | ||
| aux: this.aux, | ||
| }); | ||
| const result = | ||
| maybePromise instanceof Promise ? await maybePromise : maybePromise; | ||
| logger.stupid(`WebSocket plugin callback result`, result); | ||
| return result; | ||
| }; | ||
|
|
||
| private handleError = async ( | ||
| error: Error, | ||
| errorHandlers?: WebSocketDelegator[], | ||
| ) => { | ||
| logger.error(error); | ||
| this.request.lastError = error; | ||
|
|
||
| if (errorHandlers) { | ||
| for (const handler of errorHandlers) { | ||
| try { | ||
| await handler(); | ||
| } catch (ignorable) { | ||
| logger.error(ignorable); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| this.result = { | ||
| statusCode: 500, | ||
| body: JSON.stringify( | ||
| error instanceof Error ? { error: error.message } : error, | ||
| ), | ||
| }; | ||
|
|
||
| return error; | ||
| }; | ||
| } | ||
|
|
||
| const buildWebSocket = <Aux extends WebSocketHandlerAuxBase>( | ||
| plugins: Array<HandlerPluginBase<any>>, | ||
| ) => { | ||
| const middleware = new WebSocketHandlerMiddleware<Aux>(plugins); | ||
| return (handler: WebSocketHandler<Aux>): APIGatewayProxyWebsocketHandlerV2 => | ||
| async (event: APIGatewayProxyWebsocketEventV2, context: Context) => { | ||
| return new WebSocketHandlerProxy<Aux>(event, context).call( | ||
| middleware, | ||
| handler, | ||
| ); | ||
| }; | ||
| }; | ||
|
|
||
| export default buildWebSocket; | ||
mintae-v6x marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. base.ts 파일과 거의 비슷합니다. 다만 웹소켓 통신의 특성상 http 통신과 달리 header가 존재하지 않습니다. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| import type { APIGatewayProxyWebsocketEventV2, Context } from 'aws-lambda'; | ||
| import { getLogger } from '../utils/logger'; | ||
|
|
||
| const logger = getLogger(__filename); | ||
|
|
||
| export interface WebSocketHandlerAuxBase { | ||
| [key: string]: any; | ||
| } | ||
|
|
||
| export class WebSocketHandlerRequest { | ||
| public event: APIGatewayProxyWebsocketEventV2; | ||
| public context: Context; | ||
| public lastError: Error | string | undefined; | ||
|
|
||
| private lazyBody?: any; | ||
|
|
||
| constructor(event: APIGatewayProxyWebsocketEventV2, context: Context) { | ||
| this.event = event; | ||
| this.context = context; | ||
| this.lastError = undefined; | ||
| } | ||
|
|
||
| get body() { | ||
| if (!this.event.body) { | ||
| return {}; | ||
| } | ||
| if (this.lazyBody === undefined) { | ||
| try { | ||
| this.lazyBody = JSON.parse(this.event.body); | ||
| } catch (error) { | ||
| logger.error(`Failed to parse WebSocket body: ${error}`); | ||
| this.lazyBody = {}; | ||
| } | ||
| } | ||
| return this.lazyBody || {}; | ||
| } | ||
mintae-v6x marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| get connectionId(): string { | ||
| return this.event.requestContext.connectionId; | ||
| } | ||
|
|
||
| get routeKey(): string { | ||
| return this.event.requestContext.routeKey; | ||
| } | ||
|
|
||
| get domainName(): string { | ||
| return this.event.requestContext.domainName; | ||
| } | ||
|
|
||
| get stage(): string { | ||
| return this.event.requestContext.stage; | ||
| } | ||
|
|
||
| // HTTP plugin compatibility methods | ||
|
|
||
| /** | ||
| * For HTTP plugin compatibility (TracerPlugin uses this). | ||
| * WebSocket events may have headers in $connect route (HTTP handshake), | ||
| * but typically don't have headers in other routes. | ||
| */ | ||
| public header(key: string): string | undefined { | ||
| const event = this.event as any; | ||
| if (event.headers) { | ||
| return event.headers[key.toLowerCase()]; | ||
| } | ||
| return undefined; | ||
| } | ||
mintae-v6x marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+56
to
+67
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TracerPlugin 의 begin 함수를 보면 aws sqs에 이벤트 넣을 때 클라이언트 버전을 같이 표기하기 위해서인데... 일단 connect를 제외하고는 trace.push 했을 때 모든 버전이 0.0.0으로 표기되는 문제가 있긴 하겠으나, 아직 웹소켓 trace에서 버전을 주의깊게 봐야할 필요성은 없으므로 간단히 구현합니다.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 제 생각에는 구현이 간단하고 예상되는 부작용도 없기 때문에 포함 안할 이유가 딱히 없어 보입니다. constructor(event: APIGatewayProxyWebsocketEventV2, context: Context) {
// ... 기존 코드
const ev = this.event as any;
if (ev.headers) {
const normalized: Record<string, string | undefined> = {};
for (const key of Object.keys(ev.headers)) {
normalized[key.toLowerCase()] = ev.headers[key];
}
ev.headers = normalized;
}
}이렇게 몇 줄 추가면 |
||
| } | ||
|
|
||
| export interface WebSocketHandlerResponse { | ||
| statusCode: number; | ||
| body?: string; | ||
| } | ||
|
|
||
| export interface WebSocketHandlerContext<A extends WebSocketHandlerAuxBase> { | ||
| request: WebSocketHandlerRequest; | ||
| response: undefined; // For HTTP plugin compatibility (not used in WebSocket handlers) | ||
| aux: A; | ||
| } | ||
|
|
||
| export type WebSocketHandler<A extends WebSocketHandlerAuxBase> = ( | ||
| context: WebSocketHandlerContext<A>, | ||
| ) => WebSocketHandlerResponse | Promise<WebSocketHandlerResponse> | undefined; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build.ts 파일과 거의 비슷합니다.
http 핸들러만 websocket 핸들러로 바꾸었습니다.