Skip to content

Commit c96b87f

Browse files
committed
handle gcdata jobs in parse queue
1 parent f93dd49 commit c96b87f

File tree

10 files changed

+35
-119
lines changed

10 files changed

+35
-119
lines changed

dev/transferQueue.ts

Lines changed: 0 additions & 17 deletions
This file was deleted.

ecosystem.config.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,6 @@ let arr = [
103103
name: "monitor",
104104
group: "backend",
105105
},
106-
{
107-
name: "gcdata",
108-
group: "backend",
109-
},
110106
{
111107
name: "buildsets",
112108
group: "backend",

global.d.ts

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,6 @@ type ProfileJob = {
279279
account_id: number;
280280
};
281281

282-
type GcDataJob = {
283-
match_id: number;
284-
pgroup: PGroup;
285-
};
286-
287282
type CountsJob = ApiData;
288283
type ScenariosJob = {
289284
match_id: number;
@@ -295,6 +290,7 @@ type CacheJob = {
295290
type ParseJob = {
296291
match_id: number;
297292
origin?: DataOrigin;
293+
gcDataOnly?: boolean;
298294
};
299295

300296
type QueueJob = QueueInput["data"];
@@ -320,10 +316,6 @@ type QueueInput =
320316
name: "parse";
321317
data: ParseJob;
322318
}
323-
| {
324-
name: "gcQueue";
325-
data: GcDataJob;
326-
}
327319
| {
328320
name: "cacheQueue";
329321
data: CacheJob;

svc/gcdata.ts

Lines changed: 0 additions & 33 deletions
This file was deleted.

svc/monitor.ts

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ const health: Record<string, () => Promise<Metric>> = {
1717
seqNumDelay,
1818
parseDelay,
1919
fhDelay,
20-
gcDelay,
2120
mmrDelay,
2221
cacheDelay,
2322
scenariosDelay,
@@ -92,15 +91,6 @@ async function parseDelay() {
9291
limit: 10000,
9392
};
9493
}
95-
async function gcDelay() {
96-
const result = await db.raw(
97-
"select count(*) from queue where type = 'gcQueue'",
98-
);
99-
return {
100-
metric: result.rows[0]?.count,
101-
limit: 100000,
102-
};
103-
}
10494
async function fhDelay() {
10595
const result = await db.raw(
10696
"select count(*) from queue where type = 'fhQueue'",
@@ -170,11 +160,11 @@ WHERE keyspace_name = 'yasp';
170160
};
171161
}
172162
async function diskUsage() {
173-
const result = await statfs('/');
163+
const result = await statfs("/");
174164
return {
175165
metric: (result.blocks - result.bavail) * result.bsize,
176166
limit: result.blocks * result.bsize,
177-
}
167+
};
178168
}
179169
async function redisUsage() {
180170
const info = await redis.info();

svc/parser.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
3030
let parseTime = 0;
3131
try {
3232
redisCount("parser_job");
33-
redis.publish(
33+
await redis.publish(
3434
String(metadata.jobId),
3535
c.magenta(
3636
`Starting [job: ${metadata.jobId}] [match: ${job.match_id}] after ${Date.now() - Number(metadata.timestamp)}ms`,
@@ -50,7 +50,7 @@ async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
5050
}
5151

5252
// Fetch the API data
53-
redis.publish(String(metadata.jobId), c.blue(`Fetching API data...`));
53+
await redis.publish(String(metadata.jobId), c.blue(`Fetching API data...`));
5454
const apiStart = Date.now();
5555
// The pgroup is used to update player_caches on insert.
5656
// Since currently gcdata and parse data have no knowledge of anonymity, we pass it from API data
@@ -73,7 +73,7 @@ async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
7373
return false;
7474
}
7575
apiTime = Date.now() - apiStart;
76-
redis.publish(
76+
await redis.publish(
7777
String(metadata.jobId),
7878
c.green(`Fetched API data in ${apiTime}ms`),
7979
);
@@ -90,7 +90,10 @@ async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
9090
// }
9191

9292
// Fetch the gcdata and construct a replay URL
93-
redis.publish(String(metadata.jobId), c.blue(`Fetching replay data...`));
93+
await redis.publish(
94+
String(metadata.jobId),
95+
c.blue(`Fetching replay data...`),
96+
);
9497
const gcStart = Date.now();
9598
const { data: gcMatch, error: gcError } =
9699
await gcFetcher.getOrFetchDataWithRetry(
@@ -107,17 +110,23 @@ async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
107110
return false;
108111
}
109112
gcTime = Date.now() - gcStart;
110-
redis.publish(
113+
await redis.publish(
111114
String(metadata.jobId),
112115
c.green(`Fetched replay data in ${gcTime}ms`),
113116
);
117+
118+
if (job.gcDataOnly) {
119+
await log("skip", "Fetching replay data only, skipping parse");
120+
return true;
121+
}
122+
114123
let url = buildReplayUrl(
115124
gcMatch.match_id,
116125
gcMatch.cluster,
117126
gcMatch.replay_salt,
118127
);
119128

120-
redis.publish(String(metadata.jobId), c.blue(`Parsing replay...`));
129+
await redis.publish(String(metadata.jobId), c.blue(`Parsing replay...`));
121130
const parseStart = Date.now();
122131
const { error: parseError, skipped } = await parsedFetcher.getOrFetchData(
123132
matchId,
@@ -132,7 +141,7 @@ async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
132141
},
133142
);
134143
parseTime = Date.now() - parseStart;
135-
redis.publish(
144+
await redis.publish(
136145
String(metadata.jobId),
137146
c.green(`Parsed replay in ${parseTime}ms`),
138147
);
@@ -182,8 +191,8 @@ async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
182191
job.match_id
183192
} ${displayMsg ?? ""}`,
184193
);
185-
redis.publish("parsed", message);
186-
redis.publish(
194+
await redis.publish("parsed", message);
195+
await redis.publish(
187196
String(metadata.jobId),
188197
c[colors[type]](
189198
`[${type}] Finished [job: ${metadata.jobId}] [match: ${

svc/retriever.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ const CMsgGCMatchDetailsResponse = builder.lookupType(
5757

5858
setInterval(() => {
5959
const shouldRestart =
60-
matchRequests >= clientMap.size * matchesPerAccount ||
60+
matchRequests >= clientMap.size * matchesPerAccount ||
6161
profileRequests - profileSuccesses > 100000 ||
6262
noneReady();
6363
if (config.NODE_ENV !== "development" && shouldRestart && getUptime() > 60) {

svc/store/queue.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,6 @@ export async function addReliableJob(
138138
jobKey = `${name}:${data.match_id}`;
139139
} else if (name === "fhQueue") {
140140
jobKey = `${name}:${data.account_id}`;
141-
} else if (name === "gcQueue") {
142-
jobKey = `${name}:${data.match_id}`;
143141
} else if (name === "scenariosQueue") {
144142
jobKey = `${name}:${data.match_id}`;
145143
} else if (name === "profileQueue") {
@@ -182,7 +180,7 @@ export async function addReliableJob(
182180
name === "parse" ? data.match_id : ""
183181
}`,
184182
);
185-
redis.publish("queue", message);
183+
await redis.publish("queue", message);
186184
}
187185
// This might be undefined if a job with the same key already exists. Try to find it
188186
// May not exist anymore if the job finished in the meantime

svc/util/insert.ts

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ export async function insertMatch(
5656
// Do this after removing anonymous account IDs
5757
const pgroup = options.pgroup ?? getPGroup(match as ApiData);
5858
const trx = await db.transaction();
59+
let doGcData = false;
60+
5961
try {
6062
let isProTier = false;
6163
if ("leagueid" in match && match.leagueid) {
@@ -743,28 +745,15 @@ function updateMatchups(match) {
743745
),
744746
);
745747
}
748+
746749
async function queueGcData(trx: knex.Knex.Transaction) {
747750
// Trigger a request for gcdata
748751
if (
749752
options.origin === "scanner" &&
750753
options.type === "api" &&
751-
"game_mode" in match &&
752-
// Don't get replay URLs for event matches
753-
match.game_mode !== 19 &&
754754
match.match_id % 100 < Number(config.GCDATA_PERCENT)
755755
) {
756-
await addReliableJob(
757-
{
758-
name: "gcQueue",
759-
data: {
760-
match_id: match.match_id,
761-
pgroup,
762-
},
763-
},
764-
{
765-
trx,
766-
},
767-
);
756+
doGcData = true;
768757
}
769758
}
770759

@@ -789,18 +778,7 @@ function updateMatchups(match) {
789778
match.game_mode,
790779
],
791780
);
792-
await addReliableJob(
793-
{
794-
name: "gcQueue",
795-
data: {
796-
match_id: match.match_id,
797-
pgroup,
798-
},
799-
},
800-
{
801-
trx,
802-
},
803-
);
781+
doGcData = true;
804782
}
805783
}
806784

@@ -836,10 +814,6 @@ function updateMatchups(match) {
836814
if (options.skipParse) {
837815
return null;
838816
}
839-
if ("game_mode" in match && match.game_mode === 19) {
840-
// don't parse event matches
841-
return null;
842-
}
843817
// We only auto-parse if this is a fresh match from API
844818
if (!(options.origin === "scanner" && options.type === "api")) {
845819
return null;
@@ -855,6 +829,8 @@ function updateMatchups(match) {
855829
const doParse = hasTrackedPlayer || isLeagueMatch;
856830
if (doParse) {
857831
redisCount("auto_parse");
832+
}
833+
if (doGcData || doParse) {
858834
// By default, lower priority than requests
859835
let priority = PRIORITY.AUTO_DEFAULT;
860836
if (isLeagueMatch) {
@@ -863,6 +839,9 @@ function updateMatchups(match) {
863839
if (hasTrackedPlayer) {
864840
priority = PRIORITY.AUTO_TRACKED_PLAYER;
865841
}
842+
if (doGcData && !doParse) {
843+
priority = PRIORITY.AUTO_GCDATA;
844+
}
866845
// We might have to retry since it might be too soon for the replay
867846
let attempts = 50;
868847
const job = await addReliableJob(
@@ -871,6 +850,7 @@ function updateMatchups(match) {
871850
data: {
872851
match_id: match.match_id,
873852
origin: options.origin,
853+
gcDataOnly: doGcData && !doParse,
874854
},
875855
},
876856
{

svc/util/priority.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ export const PRIORITY = {
66
REQUEST_DEFAULT: -2,
77
REQUEST_API_KEY: -1,
88
AUTO_DEFAULT: 0,
9+
AUTO_GCDATA: 1,
910
REQUEST_ALREADY_PARSED: 9,
1011
};

0 commit comments

Comments
 (0)