diff --git a/samples/system-test/testResources.ts b/samples/system-test/testResources.ts index 045b01079..3f5665096 100644 --- a/samples/system-test/testResources.ts +++ b/samples/system-test/testResources.ts @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +// I don't like that these two files (this plus ".test") are duplicated +// across the two test structures, but because of the tangle of rootDirs +// and package.json "files", it's hard to avoid it. + import * as uuid from 'uuid'; // Returns a shortened UUID that can be used to identify a @@ -76,7 +80,7 @@ export class TestResources { getPrefix(testId?: string): string { if (testId) { return [this.testSuiteId, this.currentTime, normalizeId(testId)].join( - '-' + '-', ); } else { return [this.testSuiteId, this.currentTime].join('-'); @@ -97,7 +101,7 @@ export class TestResources { */ generateBigQueryName(testId: string): string { return [normalizeId(this.getPrefix(testId)), this.tokenMaker.uuid()].join( - '_' + '_', ); } @@ -107,45 +111,39 @@ export class TestResources { */ generateStorageName(testId: string): string { return [normalizeId(this.getPrefix(testId)), this.tokenMaker.uuid()].join( - '_' + '_', ); } - /*! + /** * Given a list of resource names (and a test ID), this will return * a list of all resources that should be deleted to clean up for * the current run of that particular test. - * - * Leaving this commented out for now since it's not actively needed. */ - /*filterForTest(testId: string, allResources: Resource[]): Resource[] { + filterForTest(testId: string, allResources: Resource[]): Resource[] { const prefix = this.getPrefix(testId); return allResources.filter(n => n.name?.includes(prefix)); - }*/ + } - /*! + /** * Given a list of resource names, this will return a list of all * resources that should be deleted to clean up after the current * run of a test suite. - * - * Leaving this commented out for now since it's not actively needed. */ - /*filterForCurrentRun(allResources: Resource[]): Resource[] { + filterForCurrentRun(allResources: Resource[]): Resource[] { const prefix = this.getPrefix(); return allResources.filter(n => n.name?.includes(prefix)); - }*/ + } /** * Given a list of resource names, this will return a list of all * resources that should be deleted to clean up after any run * of the current test suite. Note that some of the names may * still be in use. - * - * Leaving this commented out for now since it's not actively needed. */ - /*filterForSuite(allResources: Resource[]): Resource[] { + filterForSuite(allResources: Resource[]): Resource[] { return allResources.filter(n => n.name?.includes(this.testSuiteId)); - }*/ + } /** * Given a list of resource names, this will return a list of all diff --git a/system-test/pubsub.ts b/system-test/pubsub.ts index 82bff2aa3..44aaff741 100644 --- a/system-test/pubsub.ts +++ b/system-test/pubsub.ts @@ -13,12 +13,10 @@ // limitations under the License. import * as assert from 'assert'; -import {describe, it, before, after, beforeEach} from 'mocha'; +import {describe, it, before, after} from 'mocha'; import * as crypto from 'crypto'; import defer = require('p-defer'); -import * as uuid from 'uuid'; -// This is only in Node 10.17+, but it's used for system tests, should be okay. import {promises as fs} from 'fs'; import { @@ -32,223 +30,178 @@ import { SchemaViews, ISchema, Duration, + Schema, + SubscriptionOptions, } from '../src'; -import {Policy, IamPermissionsMap} from '../src/iam'; import {MessageOptions} from '../src/topic'; -import {google} from '../protos/protos'; - -type Resource = Topic | Subscription | Snapshot; - -const PREFIX = 'gcloud-tests'; -const CURRENT_TIME = Date.now(); +import {TestResources} from '../test/testResources'; +import {GoogleError} from 'google-gax'; const pubsub = new PubSub(); -function shortUUID() { - return uuid.v1().split('-').shift(); +interface UsedTopic { + name: string; + fullName: string; + topic: Topic; } -describe('pubsub', () => { - const TOPIC_NAMES = [ - generateTopicName(), - generateTopicName(), - generateTopicName(), - ]; - - const TOPICS = [ - pubsub.topic(TOPIC_NAMES[0]), - pubsub.topic(TOPIC_NAMES[1]), - pubsub.topic(TOPIC_NAMES[2]), - ]; - - const TOPIC_FULL_NAMES = TOPICS.map(getTopicName); - - function generateName(name: string) { - return [PREFIX, name, shortUUID(), CURRENT_TIME].join('-'); - } +interface UsedSub { + name: string; + fullName: string; + sub: Subscription; +} - function generateSnapshotName() { - return generateName('snapshot'); - } +describe('pubsub', () => { + const resources = new TestResources('ps-sys'); - function generateSubName() { - return generateName('subscription'); + async function generateTopic(test: string): Promise { + const name = resources.generateName(test); + const [topic] = await pubsub.topic(name).get({autoCreate: true}); + const fullName = topic.name; + return {name, topic, fullName}; } - function generateSchemaName() { - return generateName('schema'); + async function generateSub(test: string, topicName: string, opts: SubscriptionOptions = {}): Promise { + const name = resources.generateName(test); + const sub = pubsub.topic(topicName).subscription(name, opts); + await sub.create(); + const fullName = sub.name; + return {name, sub, fullName}; } - function generateSubForDetach() { - return generateSubName(); + function generateSnapshotName(test: string) { + return resources.generateName(`ss-${test}`); } - function generateTopicName() { - return generateName('topic'); + function generateSubName(test: string) { + return resources.generateName(`sub-${test}`); } - function getTopicName(topic: Topic) { - return topic.name.split('/').pop(); + function generateSchemaName(test: string) { + return resources.generateName(`sch-${test}`); } - function deleteTestResource(resource: Resource) { - // Delete resource from current test run. - if (resource.name.includes(CURRENT_TIME.toString())) { - resource.delete(); - return; - } - - // Delete left over resources which is older then 1 hour. - if (!resource.name.includes(PREFIX)) { - return; - } - - const createdAt = Number(resource.name.split('-').pop()); - const timeDiff = (Date.now() - createdAt) / (2 * 1000 * 60 * 60); - - if (timeDiff > 1) { - resource.delete(); - } + function generateTopicName(test: string) { + return resources.generateName(`top-${test}`); } async function deleteTestResources(): Promise { - const topicStream = pubsub.getTopicsStream().on('data', deleteTestResource); - const subscriptionStream = pubsub - .getSubscriptionsStream() - .on('data', deleteTestResource); - const snapshotStream = pubsub - .getSnapshotsStream() - .on('data', deleteTestResource); - - const streams = [topicStream, subscriptionStream, snapshotStream].map( - stream => { - return new Promise((resolve, reject) => { - stream.on('error', reject); - stream.on('end', resolve); - }); - } - ); + const [subs] = await pubsub.getSubscriptions(); + const [topics] = await pubsub.getTopics(); + const [snaps] = await pubsub.getSnapshots(); - // Schemas might be dependencies on topics, so wait for these first. - await Promise.all(streams); - - const allSchemas: Promise[] = []; + // Close out schemas first, since other things may be + // depending on them. + const schemas: Schema[] = []; for await (const s of pubsub.listSchemas()) { - let deleteSchema = false; - const name = s.name; - if (!name) { - continue; - } + schemas.push(pubsub.schema(s.name!)); + } + await Promise.all( + resources.filterForCleanup(schemas).map(x => x.delete?.()) + ); - // Delete resource from current test run. - if (name.includes(CURRENT_TIME.toString())) { - deleteSchema = true; - } else if (name.includes(PREFIX)) { - // Delete left over resources which are older then 1 hour. - const createdAt = Number(s.name?.split('-').pop()); - const timeDiff = (Date.now() - createdAt) / (2 * 1000 * 60 * 60); + // Snapshots. + await Promise.all( + resources.filterForCleanup(snaps).map(x => x.delete?.()) + ) - if (timeDiff > 1) { - deleteSchema = true; - } - } + // Subscriptions next. + await Promise.all( + resources.filterForCleanup(subs).map(x => x.delete?.()) + ); - if (deleteSchema) { - const wrapped = pubsub.schema(name); - allSchemas.push(wrapped.delete()); - } - } - await Promise.all(allSchemas); + // Finally topics. + await Promise.all( + resources.filterForCleanup(topics).map(x => x.delete?.()) + ); } - async function publishPop(message: MessageOptions) { - const topic = pubsub.topic(generateTopicName()); - const subscription = topic.subscription(generateSubName()); - await topic.create(); - await subscription.create(); + async function publishPop(test: string, message: MessageOptions) { + const topic = await generateTopic(test); + const sub = await generateSub(test, topic.name); for (let i = 0; i < 6; i++) { - await topic.publishMessage(message); + await topic.topic.publishMessage(message); } return new Promise((resolve, reject) => { - subscription.on('error', reject); - subscription.once('message', resolve); + sub.sub.on('error', reject); + sub.sub.once('message', resolve); }); } before(async () => { await deleteTestResources(); - - // create all needed topics with metadata - await Promise.all(TOPICS.map(t => t.create())); }); after(() => { - // Delete all created test resources return deleteTestResources(); }); describe('Topic', () => { it('should be listed', async () => { + const testTopics = [ + await generateTopic('should-list'), + await generateTopic('should-list'), + ]; const [topics] = await pubsub.getTopics(); const results = topics.filter(topic => { - const name = getTopicName(topic); - return TOPIC_FULL_NAMES.indexOf(name) !== -1; + return testTopics.findIndex(t => t.fullName === topic.name) >= 0; }); - // get all topics in list of known names - assert.strictEqual(results.length, TOPIC_NAMES.length); + assert.strictEqual(results.length, testTopics.length); }); - it('should list topics in a stream', done => { + it('should list topics in a stream', async () => { + const testTopics = [ + await generateTopic('stream-list'), + await generateTopic('stream-list'), + ]; const topicsEmitted = new Array(); - pubsub - .getTopicsStream() - .on('error', done) - .on('data', (topic: Topic) => { - topicsEmitted.push(topic); - }) - .on('end', () => { - const results = topicsEmitted.filter(topic => { - const name = getTopicName(topic); - return TOPIC_FULL_NAMES.indexOf(name) !== -1; + await new Promise((res, rej) => { + pubsub + .getTopicsStream() + .on('error', rej) + .on('data', (topic: Topic) => { + topicsEmitted.push(topic); + }) + .on('end', () => { + const results = topicsEmitted.filter(topic => { + return testTopics.findIndex(t => t.fullName === topic.name) >= 0; + }); + + assert.strictEqual(results.length, testTopics.length); + res(); }); - - assert.strictEqual(results.length, TOPIC_NAMES.length); - done(); - }); + }); }); it('should allow manual paging', async () => { + const testTopics = [ + await generateTopic('man-page'), + await generateTopic('man-page'), + ]; const [topics] = await pubsub.getTopics({ - pageSize: TOPIC_NAMES.length - 1, + pageSize: 1, gaxOpts: {autoPaginate: false}, }); - assert.strictEqual(topics.length, TOPIC_NAMES.length - 1); + assert.strictEqual(topics.length, 1); }); - it('should be created and deleted', done => { - const TOPIC_NAME = generateTopicName(); - pubsub.createTopic(TOPIC_NAME, err => { - assert.ifError(err); - pubsub.topic(TOPIC_NAME).delete(done); - }); + it('should be created and deleted', async () => { + const testTopic = await generateTopic('c-and-d'); + await testTopic.topic.delete(); }); it('should honor the autoCreate option', done => { - const topic = pubsub.topic(generateTopicName()); + const topic = pubsub.topic(generateTopicName('auto')); topic.get({autoCreate: true}, done); }); - it('should confirm if a topic exists', done => { - const topic = pubsub.topic(TOPIC_NAMES[0]); + it('should confirm if a topic exists', async () => { + const testTopic = await generateTopic('c-e'); + const topic = pubsub.topic(testTopic.name); - topic.exists( - (err: Error | null | undefined, exists: boolean | null | undefined) => { - assert.ifError(err); - assert.strictEqual(exists, true); - done(); - } - ); + const [exists] = await topic.exists(); + assert.strictEqual(exists, true); }); it('should confirm if a topic does not exist', done => { @@ -263,24 +216,15 @@ describe('pubsub', () => { ); }); - it('should publish a message', done => { - const topic = pubsub.topic(TOPIC_NAMES[0]); + it('should publish a message', async () => { + const testTopic = await generateTopic('pub-msg'); + const topic = testTopic.topic; const message = { data: Buffer.from('message from me'), orderingKey: 'a', }; - topic.publishMessage( - message, - ( - err: Error | null | undefined, - messageId: string | null | undefined - ) => { - assert.ifError(err); - assert.strictEqual(typeof messageId, 'string'); - done(); - } - ); + const result = await topic.publishMessage(message); }); it('should publish a message with attributes', async () => { @@ -288,29 +232,23 @@ describe('pubsub', () => { const attributes = { customAttribute: 'value', }; - const message = await publishPop({data, attributes}); + const message = await publishPop('pub-attr', {data, attributes}); assert.deepStrictEqual(message.data, data); assert.deepStrictEqual(message.attributes, attributes); }); - it('should get the metadata of a topic', done => { - const topic = pubsub.topic(TOPIC_NAMES[0]); - topic.getMetadata( - ( - err: ServiceError | null | undefined, - metadata: google.pubsub.v1.ITopic | null | undefined - ) => { - assert.ifError(err); - assert.strictEqual(metadata!.name, topic.name); - done(); - } - ); + it('should get the metadata of a topic', async () => { + const testTopic = await generateTopic('md-topic'); + const topic = testTopic.topic; + const [meta] = await topic.getMetadata(); + assert.strictEqual(meta.name, topic.name); }); it('should set metadata for a topic', async () => { const threeDaysInSeconds = 3 * 24 * 60 * 60; + const testTopic = await generateTopic('md-set'); - const topic = pubsub.topic(TOPIC_NAMES[0]); + const topic = testTopic.topic; await topic.setMetadata({ messageRetentionDuration: { seconds: threeDaysInSeconds, @@ -339,9 +277,10 @@ describe('pubsub', () => { } it('should pass the acceptance tests', async () => { - const [topic] = await pubsub.createTopic(generateName('orderedtopic')); + const testTopic = await generateTopic('ordered'); + const topic = testTopic.topic; const [subscription] = await topic.createSubscription( - generateName('orderedsub'), + generateSubName('ordered'), { enableMessageOrdering: true, } @@ -417,168 +356,146 @@ describe('pubsub', () => { await Promise.all(publishes); await deferred.promise; - await Promise.all([topic.delete(), subscription.delete()]); }); }); }); describe('Subscription', () => { - const TOPIC_NAME = generateTopicName(); - const topic = pubsub.topic(TOPIC_NAME); - - const SUB_NAMES = [generateSubName(), generateSubName()]; - const SUB_DETACH_NAME = generateSubForDetach(); - - const thirty = Duration.from({minutes: 30}); - const sixty = Duration.from({minutes: 60}); - const SUBSCRIPTIONS = [ - topic.subscription(SUB_NAMES[0], {minAckDeadline: thirty, maxAckDeadline: thirty}), - topic.subscription(SUB_NAMES[1], {minAckDeadline: sixty, maxAckDeadline: sixty}), - topic.subscription(SUB_DETACH_NAME, {minAckDeadline: thirty, maxAckDeadline: thirty}), - ]; - - before(async () => { - await topic.create(); - await Promise.all(SUBSCRIPTIONS.map(s => s.create())); + async function subPop(testName: string, count: number) { + const testTopic = await generateTopic(testName); + const topic = testTopic.topic; + + const testSubProms: Promise[] = []; + for (let i = 0; i < count; i++) { + testSubProms.push(generateSub(testName, testTopic.name, { + minAckDeadline: Duration.from({seconds: 60}), + maxAckDeadline: Duration.from({seconds: 60}), + })); + } + const testSubs = await Promise.all(testSubProms); + const subs = testSubs.map(t => t.sub); for (let i = 0; i < 10; i++) { const data = Buffer.from('hello'); await topic.publishMessage({data}); } await new Promise(r => setTimeout(r, 2500)); - }); - after(() => { - // Delete subscriptions - return Promise.all( - SUBSCRIPTIONS.map(async s => { - try { - await s.delete(); - } catch (e) { - await topic.delete(); - } - }) - ); - }); + return { + testTopic, + topic, + testSubs, + subs, + }; + } - it('should return error if creating an existing subscription', done => { - // Use a new topic name... - const topic = pubsub.topic(generateTopicName()); + it('should return error if creating an existing subscription', async () => { + // Use a new topic... + const topic = await generateTopic('sub-dup'); - // ...but with the same subscription name that we already created... - const subscription = topic.subscription(SUB_NAMES[0]); + // And make a sub... + const existing = await generateSub('sub-dup', topic.name); - subscription.create(err => { - if (!err) { - assert.fail('Should not have created subscription successfully.'); - } + // ...another but with the same subscription name that we already created... + const subscription = topic.topic.subscription(existing.name); + try { + await subscription.create(); + assert.fail('Should not have created subscription successfully.'); + } catch (e) { // ...and it should fail, because the subscription name is unique to the // project, and not the topic. + const err = e as GoogleError; assert.strictEqual(err!.code, 6); - done(); - }); + } }); - it('should list all subscriptions registered to the topic', done => { - topic.getSubscriptions( - ( - err: Error | null | undefined, - subs: Subscription[] | null | undefined - ) => { - assert.ifError(err); - assert.strictEqual(subs!.length, SUBSCRIPTIONS.length); - assert(subs![0] instanceof Subscription); - done(); - } - ); + it('should list all subscriptions registered to the topic', async () => { + const pop = await subPop('list-subs', 2); + const [subs] = await pop.topic.getSubscriptions(); + assert.strictEqual(subs!.length, 2); + assert(subs![0] instanceof Subscription); }); - it('should list all topic subscriptions as a stream', done => { - const subscriptionsEmitted: Array<{}> = []; - topic - .getSubscriptionsStream() - .on('error', done) - .on('data', (subscription: {}) => { - subscriptionsEmitted.push(subscription); - }) - .on('end', () => { - assert.strictEqual(subscriptionsEmitted.length, SUBSCRIPTIONS.length); - done(); - }); + it('should list all topic subscriptions as a stream', async () => { + const pop = await subPop('list-subs', 2); + + await new Promise((res, rej) => { + const subscriptionsEmitted: Array<{}> = []; + pop.topic + .getSubscriptionsStream() + .on('error', rej) + .on('data', (subscription: {}) => { + subscriptionsEmitted.push(subscription); + }) + .on('end', () => { + assert.strictEqual(subscriptionsEmitted.length, 2); + res(); + }); + }); }); - it('should list all subscriptions regardless of topic', done => { - pubsub.getSubscriptions( - (err: ServiceError | null, subscriptions?: Subscription[] | null) => { - assert.ifError(err); - assert(subscriptions instanceof Array); - done(); - } - ); + it('should list all subscriptions regardless of topic', async () => { + // Make sure there are some subs. + await subPop('all-subs', 1); + + const [results] = await pubsub.getSubscriptions(); + assert(results instanceof Array); }); - it('should list all subscriptions as a stream', done => { - let subscriptionEmitted = false; - - pubsub - .getSubscriptionsStream() - .on('error', done) - .on('data', (subscription: Subscription) => { - subscriptionEmitted = subscription instanceof Subscription; - }) - .on('end', () => { - assert.strictEqual(subscriptionEmitted, true); - done(); - }); + it('should list all subscriptions as a stream', async () => { + // Make sure there are some subs. + await subPop('all-subs', 1); + + await new Promise((res, rej) => { + let subscriptionEmitted = false; + + pubsub + .getSubscriptionsStream() + .on('error', rej) + .on('data', (subscription: Subscription) => { + subscriptionEmitted = subscription instanceof Subscription; + }) + .on('end', () => { + assert.strictEqual(subscriptionEmitted, true); + res(); + }); + }); }); - it('should allow creation and deletion of a subscription', done => { - const subName = generateSubName(); - topic.createSubscription( - subName, - ( - err: Error | null | undefined, - sub: Subscription | null | undefined - ) => { - assert.ifError(err); - assert(sub instanceof Subscription); - sub!.delete(done); - } - ); + it('should allow creation and deletion of a subscription', async () => { + const testTopic = await generateTopic('c-d-sub'); + const subName = generateSubName('c-d-sub'); + const [sub] = await testTopic.topic.createSubscription(subName); + assert(sub instanceof Subscription); + await sub.delete(); }); - it('should honor the autoCreate option', done => { - const sub = topic.subscription(generateSubName()); + it('should honor the autoCreate option', async () => { + const testTopic = await generateTopic('auto-c'); + const sub = testTopic.topic.subscription(generateSubName('auto-c')); - sub.get({autoCreate: true}, done); + await sub.get({autoCreate: true}); }); - it('should confirm if a sub exists', done => { - const sub = topic.subscription(SUB_NAMES[0]); + it('should confirm if a sub exists', async () => { + const testTopic = await generateTopic('exists'); + const testSub = await generateSub('exists', testTopic.name); + const sub = testSub.sub; - sub.exists( - (err: Error | null | undefined, exists: boolean | null | undefined) => { - assert.ifError(err); - assert.strictEqual(exists, true); - done(); - } - ); + const [exists] = await sub.exists(); + assert.strictEqual(exists, true); }); - it('should confirm if a sub does not exist', done => { - const sub = topic.subscription('should-not-exist'); + it('should confirm if a sub does not exist', async () => { + const testTopic = await generateTopic('dne'); + const sub = testTopic.topic.subscription('should-not-exist'); - sub.exists( - (err: Error | null | undefined, exists: boolean | null | undefined) => { - assert.ifError(err); - assert.strictEqual(exists, false); - done(); - } - ); + const [exists] = await sub.exists(); + assert.strictEqual(exists, false); }); - it('should create a subscription with message retention', done => { - const subName = generateSubName(); + it('should create a subscription with message retention', async () => { + const subName = generateSubName('sub-ret'); const threeDaysInSeconds = 3 * 24 * 60 * 60; const callOptions = { messageRetentionDuration: threeDaysInSeconds, @@ -586,183 +503,171 @@ describe('pubsub', () => { name: '', }; - topic.createSubscription( - subName, - callOptions, - ( - err: Error | null | undefined, - sub: Subscription | null | undefined - ) => { - assert.ifError(err); - - sub!.getMetadata( - ( - err: Error | null | undefined, - metadata: google.pubsub.v1.ISubscription | null | undefined - ) => { - assert.ifError(err); - - assert.strictEqual( - Number(metadata!.messageRetentionDuration!.seconds), - threeDaysInSeconds - ); - assert.strictEqual( - Number(metadata!.messageRetentionDuration!.nanos), - 0 - ); - - sub!.delete(done); - } - ); - } + const testTopic = await generateTopic('msg-ret'); + const [sub] = await testTopic.topic.createSubscription(subName, callOptions); + const [metadata] = await sub.getMetadata(); + assert.strictEqual( + Number(metadata!.messageRetentionDuration!.seconds), + threeDaysInSeconds + ); + assert.strictEqual( + Number(metadata!.messageRetentionDuration!.nanos), + 0 ); }); - it('should set metadata for a subscription', () => { - const subscription = topic.subscription(generateSubName()); + it('should set metadata for a subscription', async () => { + const testTopic = await generateTopic('met-sub'); + const subscription = testTopic.topic.subscription(generateSubName('met-sub')); const threeDaysInSeconds = 3 * 24 * 60 * 60; - return subscription - .create() - .then(() => { - return subscription.setMetadata({ - messageRetentionDuration: threeDaysInSeconds, - }); - }) - .then(() => { - return subscription.getMetadata(); - }) - .then(([metadata]) => { - const {seconds, nanos} = metadata.messageRetentionDuration!; - - assert.strictEqual(Number(seconds), threeDaysInSeconds); - assert.strictEqual(Number(nanos), 0); - }); + await subscription.create(); + await subscription.setMetadata({ + messageRetentionDuration: threeDaysInSeconds, + }); + const [metadata] = await subscription.getMetadata(); + const {seconds, nanos} = metadata.messageRetentionDuration!; + + assert.strictEqual(Number(seconds), threeDaysInSeconds); + assert.strictEqual(Number(nanos), 0); }); - it('should error when using a non-existent subscription', done => { - const subscription = topic.subscription(generateSubName()); + it('should error when using a non-existent subscription', async () => { + const testTopic = await generateTopic('dne-sub'); + const subscription = testTopic.topic.subscription(generateSubName('dne-sub')); - subscription.on('error', (err: {code: number}) => { - assert.strictEqual(err.code, 5); - subscription.close(done); - }); + await new Promise((res, rej) => { + subscription.on('error', (err: {code: number}) => { + assert.strictEqual(err.code, 5); + subscription.close(res); + }); - subscription.on('message', () => { - done(new Error('Should not have been called.')); + subscription.on('message', () => { + rej(new Error('Should not have been called.')); + }); }); }); - it('should receive the published messages', done => { + it('should receive the published messages', async () => { + const pop = await subPop('recv', 1); let messageCount = 0; - const subscription = topic.subscription(SUB_NAMES[1]); + const subscription = pop.subs[0]; - subscription.on('error', done); + await new Promise((res, rej) => { + subscription.on('error', rej); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - subscription.on('message', (message: {data: any}) => { - assert.deepStrictEqual(message.data, Buffer.from('hello')); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + subscription.on('message', message => { + assert.deepStrictEqual(message.data, Buffer.from('hello')); + message.ack(); - if (++messageCount === 10) { - subscription.close(done); - } + if (++messageCount === 10) { + subscription.close(res); + } + }); }); }); - it('should ack the message', done => { - const subscription = topic.subscription(SUB_NAMES[1]); + it('should ack the message', async () => { + const pop = await subPop('ack', 1); + const subscription = pop.subs[0]; - let finished = false; - subscription.on('error', () => { - if (!finished) { - finished = true; - subscription.close(done); - } - }); - subscription.on('message', ack); + await new Promise((res, rej) => { + let finished = false; + subscription.on('error', () => { + if (!finished) { + finished = true; + subscription.close(rej); + } + }); + subscription.on('message', ack); - function ack(message: Message) { - if (!finished) { - finished = true; - message.ack(); - subscription.close(done); + function ack(message: Message) { + if (!finished) { + finished = true; + message.ack(); + subscription.close(res); + } } - } + }); }); - it('should nack the message', done => { - const subscription = topic.subscription(SUB_NAMES[1]); + it('should nack the message', async () => { + const pop = await subPop('nack', 1); + const subscription = pop.subs[0]; - let finished = false; - subscription.on('error', () => { - if (!finished) { - finished = true; - subscription.close(done); - } - }); - subscription.on('message', nack); + await new Promise((res, rej) => { + let finished = false; + subscription.on('error', () => { + if (!finished) { + finished = true; + subscription.close(rej); + } + }); + subscription.on('message', nack); - function nack(message: Message) { - if (!finished) { - finished = true; - message.nack(); - subscription.close(done); + function nack(message: Message) { + if (!finished) { + finished = true; + message.nack(); + subscription.close(res); + } } - } + }); }); - it('should respect flow control limits', done => { + it('should respect flow control limits', async () => { const maxMessages = 3; let messageCount = 0; - const subscription = topic.subscription(SUB_NAMES[0], { + const pop = await subPop('fcl', 1); + const subscription = pop.topic.subscription(pop.testSubs[0].name, { flowControl: {maxMessages, allowExcessMessages: false}, }); - subscription.on('error', done); - subscription.on('message', onMessage); + await new Promise((res, rej) => { + subscription.on('error', rej); + subscription.on('message', onMessage); - function onMessage() { - if (++messageCount < maxMessages) { - return; - } + function onMessage() { + if (++messageCount < maxMessages) { + return; + } - subscription.close(done); - } + subscription.close(res); + } + }); }); - it('should send and receive large messages', done => { - const subscription = topic.subscription(SUB_NAMES[0]); + it('should send and receive large messages', async () => { + const pop = await subPop('large', 1); + const subscription = pop.subs[0]; const data = crypto.randomBytes(9000000); // 9mb - topic.publishMessage( - {data}, - (err: ServiceError | null, messageId: string | null | undefined) => { - assert.ifError(err); - - subscription.on('error', done).on('message', (message: Message) => { - if (message.id !== messageId) { - return; - } + const messageId = await pop.topic.publishMessage({data}); + await new Promise((res, rej) => { + subscription.on('error', rej).on('message', (message: Message) => { + if (message.id !== messageId) { + return; + } - assert.deepStrictEqual(data, message.data); - subscription.close(done); - }); - } - ); + assert.deepStrictEqual(data, message.data); + subscription.close(res); + }); + }); }); it('should detach subscriptions', async () => { - const subscription = topic.subscription(SUB_DETACH_NAME); + const pop = await subPop('detach', 1); + const subscription = pop.subs[0]; const [before] = await subscription.detached(); assert.strictEqual(before, false); - await pubsub.detachSubscription(SUB_DETACH_NAME); + await pubsub.detachSubscription(subscription.name); const [after] = await subscription.detached(); assert.strictEqual(after, true); }); // can be ran manually to test options/memory usage/etc. - // tslint:disable-next-line ban it.skip('should handle a large volume of messages', async function () { const MESSAGES = 200000; @@ -773,7 +678,9 @@ describe('pubsub', () => { this.timeout(0); - const subscription = topic.subscription(SUB_NAMES[0]); + const pop = await subPop('many', 1); + const topic = pop.topic; + const subscription = pop.subs[0]; topic.setPublishOptions({batching: {maxMessages: 999}}); await publish(MESSAGES); @@ -839,23 +746,18 @@ describe('pubsub', () => { }); describe('IAM', () => { - it('should get a policy', done => { - const topic = pubsub.topic(TOPIC_NAMES[0]); + it('should get a policy', async () => { + const testTopic = await generateTopic('get-pol'); + const topic = testTopic.topic; - topic.iam.getPolicy( - (err: ServiceError | null, policy: Policy | null | undefined) => { - assert.ifError(err); - - assert.deepStrictEqual(policy!.bindings, []); - assert.strictEqual(policy!.version, 0); - - done(); - } - ); + const [policy] = await topic.iam.getPolicy(); + assert.deepStrictEqual(policy!.bindings, []); + assert.strictEqual(policy!.version, 0); }); - it('should set a policy', done => { - const topic = pubsub.topic(TOPIC_NAMES[0]); + it('should set a policy', async () => { + const testTopic = await generateTopic('set-pol'); + const topic = testTopic.topic; const policy = { bindings: [ { @@ -867,118 +769,98 @@ describe('pubsub', () => { ], }; - topic.iam.setPolicy( - policy, - (err: ServiceError | null, newPolicy?: Policy | null) => { - assert.ifError(err); - const expectedBindings = policy.bindings.map(binding => - Object.assign({condition: null}, binding) - ); - assert.deepStrictEqual(newPolicy!.bindings, expectedBindings); - done(); - } + const [newPolicy] = await topic.iam.setPolicy(policy); + const expectedBindings = policy.bindings.map(binding => + Object.assign({condition: null}, binding) ); + assert.deepStrictEqual(newPolicy!.bindings, expectedBindings); }); - it('should test the iam permissions', done => { - const topic = pubsub.topic(TOPIC_NAMES[0]); + it('should test the iam permissions', async () => { + const testTopic = await generateTopic('set-pol'); + const topic = testTopic.topic; const testPermissions = ['pubsub.topics.get', 'pubsub.topics.update']; - topic.iam.testPermissions( - testPermissions, - ( - err: ServiceError | null, - permissions: IamPermissionsMap | null | undefined - ) => { - assert.ifError(err); - assert.deepStrictEqual(permissions, { - 'pubsub.topics.get': true, - 'pubsub.topics.update': true, - }); - done(); - } - ); + const [permissions] = await topic.iam.testPermissions(testPermissions); + assert.deepStrictEqual(permissions, { + 'pubsub.topics.get': true, + 'pubsub.topics.update': true, + }); }); }); describe('Snapshot', () => { - const SNAPSHOT_NAME = generateSnapshotName(); + async function snapshotPop(test: string) { + const topic: Topic = (await generateTopic('snap')).topic; + const subscription: Subscription = (await generateSub('snap', topic.name)).sub; + const snapshotId: string = generateSnapshotName('snap'); + const snapshot: Snapshot = subscription.snapshot(snapshotId); + + await snapshot.create(); + + return { + topic, + subscription, + snapshotId, + snapshot, + }; + } - let topic: Topic; - let subscription: Subscription; - let snapshot: Snapshot; function getSnapshotName({name}: {name: string}) { return name.split('/').pop(); } before(async () => { - topic = pubsub.topic(generateTopicName()); - subscription = topic.subscription(generateSubName()); - snapshot = subscription.snapshot(SNAPSHOT_NAME); - - await topic.create(); - await subscription.create(); - await snapshot.create(); + await deleteTestResources(); }); after(async () => { - await snapshot.delete(); - await subscription.delete(); - await topic.delete(); + await deleteTestResources(); }); - it('should get a list of snapshots', done => { - pubsub.getSnapshots( - ( - err: Error | null | undefined, - snapshots: Snapshot[] | null | undefined - ) => { - assert.ifError(err); - assert(snapshots!.length > 0); - const names = snapshots!.map(getSnapshotName); - assert(names.includes(SNAPSHOT_NAME)); - done(); - } - ); + it('should get a list of snapshots', async () => { + const pop = await snapshotPop('list'); + const [snapshots] = await pubsub.getSnapshots(); + assert(snapshots!.length > 0); + const names = snapshots!.map(getSnapshotName); + assert(names.includes(pop.snapshotId)); }); - it('should get a list of snapshots as a stream', done => { - const snapshots = new Array(); - pubsub - .getSnapshotsStream() - .on('error', done) - .on('data', (snapshot: Snapshot) => snapshots.push(snapshot)) - .on('end', () => { - assert(snapshots.length > 0); - const names = snapshots.map(getSnapshotName); - assert(names.includes(SNAPSHOT_NAME)); - done(); - }); + it('should get a list of snapshots as a stream', async () => { + const pop = await snapshotPop('list-s'); + const snapshots = await new Promise((res, rej) => { + const snaps = new Array(); + pubsub + .getSnapshotsStream() + .on('error', rej) + .on('data', (snapshot: Snapshot) => snaps.push(snapshot)) + .on('end', () => { + res(snaps); + }); + }); + assert(snapshots.length > 0); + const names = snapshots.map(getSnapshotName); + assert(names.includes(pop.snapshotId)); }); describe('seeking', () => { - let subscription: Subscription; - let snapshot: Snapshot; - let messageId: string; - let errorPromise: Promise<{}>; - - beforeEach(async () => { - subscription = topic.subscription(generateSubName()); - snapshot = subscription.snapshot(generateSnapshotName()); - - await subscription.create(); - await snapshot.create(); - - errorPromise = new Promise((_, reject) => - subscription.on('error', reject) + async function seekPop(test: string) { + const pop = await snapshotPop(test); + const errorPromise = new Promise((_, reject) => + pop.subscription.on('error', reject) ); - }); + + return { + errorPromise, + ...pop, + }; + } // This creates a Promise that hooks the 'message' callback of the // subscription above, and resolves when that callback calls `resolve`. type WorkCallback = (arg: Message, resolve: Function) => void; - function makeMessagePromise(workCallback: WorkCallback): Promise { + function makeMessagePromise(subscription: Subscription, workCallback: WorkCallback): Promise { return new Promise(resolve => { subscription.on('message', (arg: Message) => { workCallback(arg, resolve); @@ -986,15 +868,18 @@ describe('pubsub', () => { }); } - async function publishTestMessage() { - messageId = await topic.publish(Buffer.from('Hello, world!')); + async function publishTestMessage(topic: Topic) { + return await topic.publishMessage({data: Buffer.from('Hello, world!')}); } it('should seek to a snapshot', async () => { + const pop = await seekPop('sn-seek'); let messageCount = 0; type EventParameter = {id: string; ack: () => void}; + let messageId: string; const messagePromise = makeMessagePromise( + pop.subscription, async (message: EventParameter, resolve) => { if (message.id !== messageId) { return; @@ -1002,27 +887,30 @@ describe('pubsub', () => { message.ack(); if (++messageCount === 1) { - await snapshot!.seek(); + await pop.snapshot.seek(); return; } assert.strictEqual(messageCount, 2); - await subscription.close(); + await pop.subscription.close(); resolve(); } ); - await publishTestMessage(); - await Promise.race([errorPromise, messagePromise]); + messageId = await publishTestMessage(pop.topic); + await Promise.race([pop.errorPromise, messagePromise]); }); it('should seek to a date', async () => { + const pop = await seekPop('sn-seek-date'); let messageCount = 0; // eslint-disable-next-line @typescript-eslint/no-explicit-any type EventParameter = {id: string; ack: () => void; publishTime: any}; + let messageId: string; const messagePromise = makeMessagePromise( + pop.subscription, async (message: EventParameter, resolve) => { if (message.id !== messageId) { return; @@ -1031,7 +919,7 @@ describe('pubsub', () => { message.ack(); if (++messageCount === 1) { - subscription.seek( + pop.subscription.seek( message.publishTime, (err: ServiceError | null) => { assert.ifError(err); @@ -1041,43 +929,45 @@ describe('pubsub', () => { } assert.strictEqual(messageCount, 2); - await subscription.close(); + await pop.subscription.close(); resolve(); } ); - await publishTestMessage(); - await Promise.race([errorPromise, messagePromise]); + messageId = await publishTestMessage(pop.topic); + await Promise.race([pop.errorPromise, messagePromise]); }); it('should seek to a future date (purge)', async () => { + const pop = await seekPop('sn-purge'); const testText = 'Oh no!'; - await publishTestMessage(); + await publishTestMessage(pop.topic); // Forward-seek to remove any messages from the queue (those were // placed there in before()). // // We... probably won't be using this in 3000? - await subscription.seek(new Date('3000-01-01')); + await pop.subscription.seek(new Date('3000-01-01')); // Drop a second message and make sure it's the right ID. - await topic.publish(Buffer.from(testText)); + await pop.topic.publishMessage({data: Buffer.from(testText)}); type EventParameter = {data: {toString: () => string}; ack: () => void}; const messagePromise = makeMessagePromise( + pop.subscription, async (message: EventParameter, resolve) => { // If we get the default message from before() then this fails. assert.equal(message.data.toString(), testText); message.ack(); - await subscription.close(); + await pop.subscription.close(); resolve(); } ); - await Promise.race([errorPromise, messagePromise]); + await Promise.race([pop.errorPromise, messagePromise]); }); }); }); @@ -1106,21 +996,21 @@ describe('pubsub', () => { return schemaDef; }; - const setupTestSchema = async () => { + const setupTestSchema = async (test: string) => { const schemaDef = await getSchemaDef(); - const schemaId = generateSchemaName(); + const schemaId = generateSchemaName(test); await pubsub.createSchema(schemaId, SchemaTypes.Avro, schemaDef); return schemaId; }; it('should create a schema', async () => { - const schemaId = await setupTestSchema(); + const schemaId = await setupTestSchema('sc-create'); const schemaList = await aiToArray(pubsub.listSchemas(), schemaId); assert.strictEqual(schemaList.length, 1); }); it('should delete a schema', async () => { - const schemaId = await setupTestSchema(); + const schemaId = await setupTestSchema('sc-del'); // Validate that we created one, because delete() doesn't throw, and we // might end up causing a false negative. @@ -1134,7 +1024,7 @@ describe('pubsub', () => { }); it('should list schemas', async () => { - const schemaId = await setupTestSchema(); + const schemaId = await setupTestSchema('sc-list'); const basicList = await aiToArray( pubsub.listSchemas(SchemaViews.Basic), @@ -1152,7 +1042,7 @@ describe('pubsub', () => { }); it('should get a schema', async () => { - const schemaId = await setupTestSchema(); + const schemaId = await setupTestSchema('sc-get'); const schema = pubsub.schema(schemaId); const info = await schema.get(SchemaViews.Basic); assert.strictEqual(info.definition, ''); diff --git a/test/testResources.test.ts b/test/testResources.test.ts new file mode 100644 index 000000000..b1cf3a673 --- /dev/null +++ b/test/testResources.test.ts @@ -0,0 +1,84 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe, it, beforeEach} from 'mocha'; +import {TestResources} from './testResources'; +import * as assert from 'assert'; + +describe('testResources (unit)', () => { + const fixedId = 'fixed'; + const fixedTime = Date.now(); + const fakeTokenMaker = { + uuid: () => fixedId, + timestamp: () => fixedTime, + }; + + const suiteId = 'someSuite'; + let testResources!: TestResources; + + beforeEach(() => { + testResources = new TestResources(suiteId, fakeTokenMaker); + }); + + it('has predictable prefixes', () => { + const prefix = testResources.getPrefix('testId'); + assert.strictEqual(prefix, `${suiteId}-${fixedTime}-testId`); + + const normalizedPrefix = testResources.getPrefix('test-id-dashes'); + assert.strictEqual( + normalizedPrefix, + `${suiteId}-${fixedTime}-test_id_dashes`, + ); + + const suitePrefix = testResources.getPrefix(); + assert.strictEqual(suitePrefix, `${suiteId}-${fixedTime}`); + }); + + it('generates names', () => { + const prefix = testResources.getPrefix('testId'); + const name = testResources.generateName('testId'); + assert.strictEqual(name, `${prefix}-${fixedId}`); + }); + + it('filters for cleanup', () => { + const resources = [ + { + // Not related + name: 'ooga', + }, + { + // For current test run + name: `${suiteId}-${fixedTime}-bob-98719284791`, + }, + { + // For previous test run, but not very old + name: `${suiteId}-${fixedTime - 100}-bob-124897912`, + }, + { + // For previous test run, but old + name: `${suiteId}-${fixedTime - 3000 * 60 * 60}-bob-57823975`, + }, + ]; + const filtered = testResources.filterForCleanup(resources); + assert.strictEqual(filtered.length, 2); + assert.strictEqual( + 1, + filtered.filter(r => r.name?.includes('bob-9871')).length, + ); + assert.strictEqual( + 1, + filtered.filter(r => r.name?.includes('bob-5782')).length, + ); + }); +}); diff --git a/test/testResources.ts b/test/testResources.ts new file mode 100644 index 000000000..05550606d --- /dev/null +++ b/test/testResources.ts @@ -0,0 +1,182 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// I don't like that these two files (this plus ".test") are duplicated +// across the two test structures, but because of the tangle of rootDirs +// and package.json "files", it's hard to avoid it. + +import * as uuid from 'uuid'; + +// Returns a shortened UUID that can be used to identify a +// specific run of a specific test. +function shortUUID() { + return uuid.v4().split('-').shift()!; +} + +export interface TokenMaker { + uuid(): string; + timestamp(): number; +} + +export const defaultMaker = { + uuid: shortUUID, + timestamp: () => Date.now(), +}; + +export interface Resource { + name?: string | null | undefined; + delete?(): Promise; +} + +function normalizeId(id: string): string { + return id.replace(/-/g, '_'); +} + +/** + * Manages the names of testing resources during a test run. It's + * easily to accidentally leak resources, and it's easy to accidentally + * have conflicts with tests running concurrently, so this class helps + * you manage them. + * + * Used nomenclature: + * Test - a single test for a single aspect of code; for example, + * "create a topic in pub/sub" + * Test Suite - a collection of tests that are generally run together; + * for example, "test topic operations in pub/sub" + * Test Run - a single run of a test suite (or single test within a suite); + * for example, "run the tests for PR #1234, 5th attempt" + */ +export class TestResources { + testSuiteId: string; + currentTime: string; + tokenMaker: TokenMaker; + + /** + * @param testSuiteId [string] A unique ID for a test suite (e.g. + * pubsub-topics). + */ + constructor(testSuiteId: string, tokenMaker: TokenMaker = defaultMaker) { + this.testSuiteId = normalizeId(testSuiteId); + this.currentTime = `${tokenMaker.timestamp()}`; + this.tokenMaker = tokenMaker; + } + + /** + * Returns the resource prefix for the current run of the test suite. + * Optionally, testId may specify the specific ID of a test in the + * suite. + */ + getPrefix(testId?: string): string { + if (testId) { + return [this.testSuiteId, this.currentTime, normalizeId(testId)].join( + '-', + ); + } else { + return [this.testSuiteId, this.currentTime].join('-'); + } + } + + /** + * Generates a unique resource name for one run of a test within + * a test suite. + */ + generateName(testId: string): string { + return [this.getPrefix(testId), this.tokenMaker.uuid()].join('-'); + } + + /** + * Generates a unique resource name for one run of a test within + * a test suite for BigQuery resources. + */ + generateBigQueryName(testId: string): string { + return [normalizeId(this.getPrefix(testId)), this.tokenMaker.uuid()].join( + '_', + ); + } + + /** + * Generates a unique resource name for one run of a test within + * a test suite for Cloud Storage resources. + */ + generateStorageName(testId: string): string { + return [normalizeId(this.getPrefix(testId)), this.tokenMaker.uuid()].join( + '_', + ); + } + + /** + * Given a list of resource names (and a test ID), this will return + * a list of all resources that should be deleted to clean up for + * the current run of that particular test. + */ + filterForTest(testId: string, allResources: Resource[]): Resource[] { + const prefix = this.getPrefix(testId); + return allResources.filter(n => n.name?.includes(prefix)); + } + + /** + * Given a list of resource names, this will return a list of all + * resources that should be deleted to clean up after the current + * run of a test suite. + */ + filterForCurrentRun(allResources: Resource[]): Resource[] { + const prefix = this.getPrefix(); + return allResources.filter(n => n.name?.includes(prefix)); + } + + /** + * Given a list of resource names, this will return a list of all + * resources that should be deleted to clean up after any run + * of the current test suite. Note that some of the names may + * still be in use. + */ + filterForSuite(allResources: Resource[]): Resource[] { + return allResources.filter(n => n.name?.includes(this.testSuiteId)); + } + + /** + * Given a list of resource names, this will return a list of all + * resources that should be deleted to generally clean up after any + * run of the current test suite. This is much like filterForSuite(), + * but it also filters by age - items that are less than 2 hours + * old will not be cleaned. + */ + filterForCleanup(allResources: Resource[]): Resource[] { + const currentRunPrefix = this.getPrefix(); + return allResources.filter(n => { + let name = n.name || undefined; + if (name === undefined) { + return false; + } + + // We'll always get at least one thing. + name = name.split('/').pop()!; + + if (name.startsWith(currentRunPrefix)) { + return true; + } + + if (name.startsWith(this.testSuiteId)) { + const parts = name.split('-'); + const createdAt = Number(parts[1]); + const timeDiff = (Date.now() - createdAt) / (1000 * 60 * 60); + if (timeDiff >= 2) { + return true; + } + } + + return false; + }); + } +}