Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0d7ad49
Europe PMC data ingestion script
ashwinisukale May 13, 2025
f28ca74
process eroupepmc row
ashwinisukale Jun 2, 2025
ee13850
Process CSV files from local
ashwinisukale Jun 3, 2025
86a9ca9
As per CSV column name changed
ashwinisukale Jun 6, 2025
6420bd0
500 citation per activity
ashwinisukale Jun 13, 2025
168e63d
Fixed issue with memory heap
ashwinisukale Jun 16, 2025
d230e0a
Corrected name
ashwinisukale Jun 17, 2025
29dded9
Fetch files from s3 and process them
ashwinisukale Jun 17, 2025
1211d02
rename method and use s3
kaysiz Jun 17, 2025
8811820
Update importer to process all umnprocessed activity logs
kaysiz Jun 22, 2025
6ca99c8
Add method to process s3 files and excluded already processed ones
kaysiz Jun 22, 2025
102017a
Add model to for czifile table
kaysiz Jun 22, 2025
840a5ab
Add eupmcfile to process the files from s3
kaysiz Jun 22, 2025
33db726
fix doibaseurl reference
kaysiz Jun 22, 2025
1686f1e
Update processing of eumpc activity logs in parallel and handle publi…
kaysiz Jun 23, 2025
6cfb878
Add repository handling for crossref records
kaysiz Jun 23, 2025
f2491f6
update parallel processes number and update activity log when it is d…
kaysiz Aug 11, 2025
766c690
Add logs volume and prod db for v4
kaysiz Aug 11, 2025
6e896ca
update docker image for local dev
kaysiz Aug 11, 2025
6460cd9
Update datacite import dates and local crossref api
kaysiz Aug 11, 2025
4bc2ac7
remove unused code
kaysiz Aug 11, 2025
6b0aa36
undo delete of file
kaysiz Aug 11, 2025
8c99b15
Add v4 release documentation
kaysiz Sep 1, 2025
8b8afce
Update docker image
kaysiz Sep 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker-compose.dataciteimport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ services:
entrypoint:
[
'node_modules/.bin/wait-for-it',
# 'asap-ingest-jan-09.cpcwgoa3uzw1.eu-west-1.rds.amazonaws.com:5432',
'corpus-v3.cpcwgoa3uzw1.eu-west-1.rds.amazonaws.com:5432',
'corpus-v4.cpcwgoa3uzw1.eu-west-1.rds.amazonaws.com:5432',
'--',
'sh',
'scripts/setupDevServer.sh',
Expand Down Expand Up @@ -57,3 +56,4 @@ services:
- ./packages/server/rest:/home/node/app/rest
- ./packages/server/scripts:/home/node/app/scripts
- ./packages/server/services:/home/node/app/services
- ./logs:/home/node/app/logs
56 changes: 56 additions & 0 deletions docker-compose.europepmcimport.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
version: '3'

services:
importer:
platform: linux/amd64
build:
context: ./packages/server
dockerfile: ./Dockerfile-development
entrypoint:
[
'node_modules/.bin/wait-for-it',
'corpus-v4.cpcwgoa3uzw1.eu-west-1.rds.amazonaws.com:5432',
'--',
'sh',
'scripts/setupDevServer.sh',
]
command:
[
'node_modules/.bin/nodemon',
'--max-old-space-size=8192',
'epmcImport.js',
'--watch',
'--ext',
'js,graphql',
]
ports:
- 3000:3000
environment:
- NODE_ENV=development
- POSTGRES_HOST=${POSTGRES_HOST}
- POSTGRES_PORT=${POSTGRES_PORT:-5432}
- POSTGRES_DB=${POSTGRES_DB:-dev_db}
- POSTGRES_USER=${POSTGRES_USER:-dev_user}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-dev_user_password}
- PUBSWEET_SECRET=${PUBSWEET_SECRET:-superSecretThing}
- SERVER_PORT=${SERVER_PORT:-3000}
- HOSTNAME=${HOSTNAME}
- CLIENT_URL=${CLIENT_URL:-http://0.0.0.0:4000}
- PASSWORD_RESET_PATH=${PASSWORD_RESET_PATH:-password-reset}
- S3_PROTOCOL=http
- S3_HOST=filehosting
- S3_PORT=${S3_PORT:-9000}
- S3_ACCESS_KEY_ID=${S3_ACCESS_KEY_ID:-nonRootUser}
- S3_SECRET_ACCESS_KEY=${S3_SECRET_ACCESS_KEY:-nonRootPassword}
- S3_EUROPEPMC_BUCKET=${S3_EUROPEPMC_BUCKET}
- S3_EUROPEPMC_FOLDER=${S3_EUROPEPMC_FOLDER}
volumes:
- ./packages/server/api:/home/node/app/api
- ./packages/server/config:/home/node/app/config
- ./packages/server/controllers:/home/node/app/controllers
- ./packages/server/models:/home/node/app/models
- ./packages/server/rest:/home/node/app/rest
- ./packages/server/scripts:/home/node/app/scripts
- ./packages/server/services:/home/node/app/services
- ./packages/server/data:/home/node/app/data
- ./logs:/home/node/app/logs
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
48 changes: 48 additions & 0 deletions docs/v4 release.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# EuropePMC v4 Ingestion – Release Documentation

## 1. Purpose of the Release
- **Goal:** Ingest new citations from EuropePMC (v4).
- **Outcome:** Expanded citation coverage and updated assertions table.

---

## 2. High-Level Workflow
1. **Download raw CSV files** from EuropePMC.
2. **Process and reformat** files by adding additional metadata.
3. **Store files in S3** in the correct path structure.
4. **Run ingestion pipeline:**
- Reads S3 files
- Creates activity logs
- Processes activity logs
- Run local apis for Crossref and ROR to mitigate rate limit
5. **Enrich data:**
- DataCite (for DOIs)
- Crossref (for accession numbers)
6. **Insert records into `assertions` table.**
7. **Post-processing cleanup** of malformed or duplicate records.
8. **Generate final data dump** for release.

![EuropePMC ingestion flow](attachments/ccf686f5-9647-4aaf-9467-12c8acd4077a.png)

---

## 3. Key Scripts and Commands
- `https://github.com/Make-Data-Count-Community/corpus-data-file/corpus-v4/data_ingestion/eupmc_reformat_csv.py`
- `https://github.com/Make-Data-Count-Community/corpus-data-file/corpus-v4/data_ingestion/eupmc_file_downloader.sh`

---

## 4. Gotchas / Lessons Learned
- Verify CSV schema changes before reformatting.
- Ensure correct S3 bucket and path to avoid ingestion failures.
- Monitor long-running pipeline jobs.
- Crossref enrichment may fail if DOI rate limits are hit.

---

## 5. Links to Artifacts
- **Raw CSV files (S3):** `s3://europepmc-files/unprocessed/`
- **Processed CSV files (S3):** `s3://europepmc-files/processed/`
- **V4 dump files (S3):** `s3://corpus-data-files/v4.1/`

---
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
"@coko/lint": "^2.0.1"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.825.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"csv-parse": "^5.6.0",
"pg-listen": "^1.7.0"
}
}
9 changes: 6 additions & 3 deletions packages/server/Dockerfile-development
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
FROM node:16.19.0
FROM node:16.20.2-bullseye-slim

RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y ranger vim p7zip-full
&& apt-get install -y ranger vim p7zip-full build-essential python3 \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /home/node/app

Expand All @@ -15,4 +16,6 @@ USER node

RUN yarn install --frozen-lockfile
# RUN yarn cache clean
COPY --chown=node:node . .
COPY --chown=node:node . .

#CMD ["node", "--max-old-space-size=4096", "importer.js"]
2 changes: 1 addition & 1 deletion packages/server/Dockerfile-production
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:16.19.0-buster-slim as server
FROM node:16.19.0-bullseye-slim as server

RUN apt-get update \
&& apt-get upgrade -y \
Expand Down
20 changes: 20 additions & 0 deletions packages/server/epmcImport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Changes to this file require rebuilding the image using 'docker-compose -f docker-compose.withexternaldb.cziimport.yml build'
*/
// const dataCitePrefixImport = require('./services/scheduledTaskService/dataCitePrefixImport')

const epmcImport = require('./services/scheduledTaskService/epmcImport')

const init = async () => {
try {
// uncomment this to fetch all prefixes from datacite API and insert into DB
// NOTE this is not idempotent - prefixes will be duplicated if run multiple times
// await dataCitePrefixImport()

await epmcImport()
} catch (e) {
throw new Error(e)
}
}

init()
37 changes: 37 additions & 0 deletions packages/server/models/cziFileModel/cziFileModel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const { BaseModel } = require('@coko/server')

class CziFileModel extends BaseModel {
constructor(properties) {
super(properties)
this.type = 'CziFileModel'
}

static get tableName() {
return 'czi_files'
}

static get schema() {
return {
properties: {
file_name: {
type: ['string']
},
type: {
type: ['string'],
},
proccessed: {
default: false,
type: ['boolean', false],
},
done: {
default: false,
type: ['boolean', false],
},
},
required: ['file_name', 'type'],
type: 'object',
}
}
}

module.exports = CziFileModel
7 changes: 7 additions & 0 deletions packages/server/models/cziFileModel/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* eslint-disable global-require */
const model = require('./cziFileModel')

module.exports = {
model,
modelName: 'CziFileModel',
}
4 changes: 4 additions & 0 deletions packages/server/services/assertionFactory/assertionFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class AssertionFactory {
DataciteToAssertion,
CrossrefToAssertion
],
eupmc: [
DataciteToAssertion,
CrossrefToAssertion
],
czi: [DataciteToAssertion, CrossrefToAssertion, CziToAssertion],
}

Expand Down
17 changes: 16 additions & 1 deletion packages/server/services/assertionFactory/crossrefToAssertion.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable no-param-reassign */
const { Publisher, Journal } = require('@pubsweet/models')
const { Publisher, Journal, Repository } = require('@pubsweet/models')

class CrossrefToAssertion {
// eslint-disable-next-line class-methods-use-this
Expand Down Expand Up @@ -31,6 +31,21 @@ class CrossrefToAssertion {

assertionInstance.journalId = journal.id
}

if( chunk.repository) {
const title = chunk.repository

const exists = await Repository.query(trx).findOne({ title })
let repository = exists

if (!exists) {
repository = await Repository.query(trx)
.insert({ title })
.returning('*')
}

assertionInstance.repositoryId = repository.id
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/server/services/awsS3Service/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const { logger } = require('@coko/server')
class AwsS3Service {
constructor() {
AWS.config.update({
accessKeyId: 'xzy', // TODO fetch these creds from environments variables
secretAccessKey: 'xyz',
accessKeyId: process.env.S3_ACCESS_KEY_ID,
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
region: 'eu-west-1',
})

Expand Down
16 changes: 15 additions & 1 deletion packages/server/services/axiosService/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,26 @@ module.exports = {
return response
},
crossrefApi: (url, headers = {}) => {
url = `${url}/transform/application/vnd.crossref.unixsd+xml`
url = `${url}/transform/application/vnd.crossref.unixsd+xml?mailto:${encodeURIComponent('kuda.siziva@datacite.org')}`

const response = request({
url,
baseURL: 'https://api.crossref.org',
method: 'get',
headers: {
'Content-Type': 'application/json',
'User-Agent': `DataCite-Corpus-App/4.0 (https://datacite.org/; mailto:kuda.siziva@datacite.org)`,
...headers,
},
})

return response
},
crossrefLocalApi: (url, headers = {}) => {
const response = request({
url,
baseURL: 'http://host.docker.internal:8000',
method: 'get',
headers: {
'Content-Type': 'application/json',
...headers,
Expand Down
38 changes: 24 additions & 14 deletions packages/server/services/corpusData.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const AssertionFactory = require('./assertionFactory/assertionFactory')
const ActivityLog = require('../models/activityLog/activityLog')
const Source = require('../models/source/source')

const NUMBER_OF_PARALLEL_IMPORT_STREAMS = 5
const NUMBER_OF_PARALLEL_IMPORT_STREAMS = 50

class CorpusData {
constructor(seedSource, metadataSource) {
Expand Down Expand Up @@ -58,26 +58,29 @@ class CorpusData {
return 0
}

await ActivityLog.query()
.patch({
proccessed: true,
})
.findById(activityLogRecord.id)

const data = JSON.parse(res.data)

data.forEach(citation => {
const { id } = sources.find(
const { id, abbreviation } = sources.find(
s => s.abbreviation === res.action.replace('assertion_incoming_', ''),
)

if (id) {
const assertions = {
activityId: activityLogRecord.id,
source: id,
event: citation,
datacite: {},
crossref: {},
let assertions;

if (abbreviation === 'eupmc') {
assertions = {
activityId: activityLogRecord.id,
...citation
}
}else{
assertions = {
activityId: activityLogRecord.id,
source: id,
event: citation,
datacite: {},
crossref: {},
}
}

metadataSource.startStreamCitations(assertions)
Expand All @@ -95,6 +98,13 @@ class CorpusData {
`Saving ${result.length} assertions for activity log ${activityLogRecord.id}...`,
)
await AssertionFactory.saveDataToAssertionModel(result)

await ActivityLog.query()
.patch({
proccessed: true,
})
.findById(activityLogRecord.id)

return result.length
} catch (e) {
logger.info(e)
Expand Down
Loading