Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/dynamodb-wrapper.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
declare interface IDynamoDBWrapperOptions {
autoPagination?: boolean;
tableNamePrefix?: string;
groupDelayMs?: number;
maxRetries?: number;
Expand All @@ -21,10 +22,12 @@ declare interface IBatchWriteItemOption {

declare interface IQueryOptions {
groupDelayMs?: number;
autoPagination?: boolean;
}

declare interface IScanOptions {
groupDelayMs?: number;
autoPagination?: boolean;
}

declare type TDictionary<T> = { [key: string] : T; };
Expand Down
19 changes: 17 additions & 2 deletions lib/dynamodb-wrapper.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { DynamoDBWrapper } from './dynamodb-wrapper';

describe('lib/dynamodb-wrapper', () => {

function _setupDynamoDBWrapper(options?: IMockDynamoDBOptions) {
function _setupDynamoDBWrapper(options?: IMockDynamoDBOptions, wrapperOptions?: IDynamoDBWrapperOptions) {
options = options || {};
let mockDynamoDB = new MockDynamoDB(options);
return {
Expand All @@ -15,7 +15,8 @@ describe('lib/dynamodb-wrapper', () => {
maxRetries: 2,
retryDelayOptions: {
base: 0
}
},
...wrapperOptions
})
};
}
Expand Down Expand Up @@ -279,6 +280,20 @@ describe('lib/dynamodb-wrapper', () => {
expect(response.ConsumedCapacity).not.toBeDefined();
});

it('should query only once when auto pagination is disabled', async () => {
let params = _setupQueryParams();
let mock = _setupDynamoDBWrapper(null, { autoPagination: false });
let dynamoDB = mock.dynamoDB;
let dynamoDBWrapper = mock.dynamoDBWrapper;

spyOn(dynamoDB, 'query').and.callThrough();
let response = await dynamoDBWrapper.query(params);

expect(dynamoDB.query).toHaveBeenCalledTimes(1);

expect(response.LastEvaluatedKey).toEqual({ id: { S: 'foo' } });
});

it('should aggregate consumed capacity (TOTAL) from multiple responses', async () => {
let params = _setupQueryParams('TOTAL');
let mock = _setupDynamoDBWrapper();
Expand Down
30 changes: 20 additions & 10 deletions lib/dynamodb-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class DynamoDBWrapper {
public groupDelayMs: number;
public maxRetries: number;
public retryDelayOptions: any;
public autoPagination: boolean;

constructor(dynamoDB: any, options?: IDynamoDBWrapperOptions) {
this.dynamoDB = dynamoDB;
Expand All @@ -24,6 +25,8 @@ export class DynamoDBWrapper {
options.retryDelayOptions = options.retryDelayOptions || {};
this.tableNamePrefix = typeof options.tableNamePrefix === 'string' ? options.tableNamePrefix : '';
this.groupDelayMs = getNonNegativeInteger([options.groupDelayMs, 100]);
this.autoPagination = options.autoPagination ?? true;

this.maxRetries = getNonNegativeInteger([options.maxRetries, 10]);
this.retryDelayOptions = {};
this.retryDelayOptions.base = getNonNegativeInteger([options.retryDelayOptions.base, 100]);
Expand Down Expand Up @@ -147,8 +150,9 @@ export class DynamoDBWrapper {
// set default options
options = options || {};
options.groupDelayMs = getNonNegativeInteger([options.groupDelayMs, this.groupDelayMs]);
options.autoPagination = options.autoPagination ?? this.autoPagination;

let responses = await this._queryOrScanHelper('query', params, options.groupDelayMs);
let responses = await this._queryOrScanHelper('query', params, options);
return _makeQueryOrScanResponse(responses);
}

Expand All @@ -168,8 +172,9 @@ export class DynamoDBWrapper {
// set default options
options = options || {};
options.groupDelayMs = getNonNegativeInteger([options.groupDelayMs, this.groupDelayMs]);
options.autoPagination = options.autoPagination ?? this.autoPagination;

let responses = await this._queryOrScanHelper('scan', params, options.groupDelayMs);
let responses = await this._queryOrScanHelper('scan', params, options);
return _makeQueryOrScanResponse(responses);
}

Expand Down Expand Up @@ -215,19 +220,21 @@ export class DynamoDBWrapper {
return _makeBatchWriteItemResponse(tableNames, responsesPerTable, totalRequestItems);
}

private async _queryOrScanHelper(method: string, params: any, groupDelayMs: number): Promise<any> {
private async _queryOrScanHelper(method: string, params: any, { groupDelayMs, autoPagination }: IQueryOptions ): Promise<any> {
let list = [];

// first page of data
let res = await this._callDynamoDB(method, params);
list.push(res);

// make subsequent requests to get remaining pages of data
while (res.LastEvaluatedKey) {
await wait(groupDelayMs);
params.ExclusiveStartKey = res.LastEvaluatedKey;
res = await this._callDynamoDB(method, params);
list.push(res);
if (autoPagination) {
// make subsequent requests to get remaining pages of data
while (res.LastEvaluatedKey) {
await wait(groupDelayMs);
params.ExclusiveStartKey = res.LastEvaluatedKey;
res = await this._callDynamoDB(method, params);
list.push(res);
}
}

return list;
Expand Down Expand Up @@ -406,17 +413,20 @@ function _makeQueryOrScanResponse(responses: any): any {
let count = 0;
let scannedCount = 0;
let items = [];
let lastEvaluatedKey = null;

for (let res of responses) {
count += res.Count;
scannedCount += res.ScannedCount;
lastEvaluatedKey = res.LastEvaluatedKey;
appendArray(items, res.Items);
}

let result: any = {
Count: count,
ScannedCount: scannedCount,
Items: items
Items: items,
LastEvaluatedKey: lastEvaluatedKey
};

if (responses[0].ConsumedCapacity) {
Expand Down
2 changes: 1 addition & 1 deletion lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export function appendArray(array1: any[], array2: any[]) {
}
}

export function wait(ms: number): Promise<any> {
export function wait(ms: number): Promise<void> {
return new Promise(resolve => {
setTimeout(() => {
resolve();
Expand Down
Loading