Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,6 @@ EVENT_TENANT_LISTENER_API='http://interface:3567/scp/v1/tenant/add'
#Enable / disable organization event
EVENT_ENABLE_ORGANIZATION_EVENTS=true
#Event Kafka topic for organization create/update
EVENT_ORGANIZATION_KAFKA_TOPIC='dev.organizationEvent'
EVENT_ORGANIZATION_KAFKA_TOPIC='dev.organizationEvent'
#Service name for health Check
SERVICE_NAME = 'UserService'
4,122 changes: 3,638 additions & 484 deletions src/api-doc/MentorED-Users.postman_collection.json

Large diffs are not rendered by default.

37 changes: 35 additions & 2 deletions src/configs/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,79 @@ const { elevateLog } = require('elevate-logger')
const logger = elevateLog.init()

module.exports = async () => {
console.log('🚀 [USER SERVICE KAFKA] ===== STARTING KAFKA CONFIGURATION =====')
console.log('🚀 [USER SERVICE KAFKA] Environment variables:')
console.log('🚀 [USER SERVICE KAFKA] KAFKA_URL:', process.env.KAFKA_URL)
console.log('🚀 [USER SERVICE KAFKA] KAFKA_GROUP_ID:', process.env.KAFKA_GROUP_ID)
console.log('🚀 [USER SERVICE KAFKA] EVENT_USER_KAFKA_TOPIC:', process.env.EVENT_USER_KAFKA_TOPIC)
console.log('🚀 [USER SERVICE KAFKA] CLEAR_INTERNAL_CACHE:', process.env.CLEAR_INTERNAL_CACHE)

const kafkaIps = process.env.KAFKA_URL.split(',')
console.log('🚀 [USER SERVICE KAFKA] Kafka brokers:', kafkaIps)

const KafkaClient = new Kafka({
clientId: 'mentoring',
clientId: 'user-service',
brokers: kafkaIps,
})

console.log('🚀 [USER SERVICE KAFKA] Creating producer and consumer...')
const producer = KafkaClient.producer()
const consumer = KafkaClient.consumer({ groupId: process.env.KAFKA_GROUP_ID })

console.log('🚀 [USER SERVICE KAFKA] Connecting producer...')
await producer.connect()
console.log('🚀 [USER SERVICE KAFKA] ✅ Producer connected successfully')

console.log('🚀 [USER SERVICE KAFKA] Connecting consumer...')
await consumer.connect()
console.log('🚀 [USER SERVICE KAFKA] ✅ Consumer connected successfully')

producer.on('producer.connect', () => {
logger.info(`KafkaProvider: connected`)
console.log('🚀 [USER SERVICE KAFKA] Producer event: connected')
})
producer.on('producer.disconnect', () => {
logger.error(`KafkaProvider: could not connect`, {
triggerNotification: true,
})
console.log('🚀 [USER SERVICE KAFKA] Producer event: disconnected')
})

const subscribeToConsumer = async () => {
await consumer.subscribe({ topics: [process.env.CLEAR_INTERNAL_CACHE] })
console.log('🚀 [USER SERVICE KAFKA] Setting up consumer subscriptions...')
const topics = [process.env.CLEAR_INTERNAL_CACHE].filter(Boolean)
console.log('🚀 [USER SERVICE KAFKA] Subscribing to topics:', topics)

await consumer.subscribe({ topics })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
console.log('🚀 [USER SERVICE KAFKA] Cache clear message received:', {
topic,
partition,
offset: message.offset
})
let streamingData = JSON.parse(message.value)
if (streamingData.type == 'CLEAR_INTERNAL_CACHE') {
console.log('🚀 [USER SERVICE KAFKA] Processing cache clear for:', streamingData.value)
utils.internalDel(streamingData.value)
}
} catch (error) {
console.log('🚀 [USER SERVICE KAFKA] ❌ Consumer error:', error.message)
logger.error('Subscribe to consumer failed:' + error, {
triggerNotification: true,
})
}
},
})
}

console.log('🚀 [USER SERVICE KAFKA] Starting consumer subscriptions...')
subscribeToConsumer()

console.log('🚀 [USER SERVICE KAFKA] Setting global variables...')
global.kafkaProducer = producer
global.kafkaClient = KafkaClient

console.log('🚀 [USER SERVICE KAFKA] ===== KAFKA CONFIGURATION COMPLETED =====')
}
1 change: 1 addition & 0 deletions src/controllers/v1/account.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module.exports = class Account {
}

const device_info = req.headers && req.headers['device-info'] ? JSON.parse(req.headers['device-info']) : {}

try {
const createdAccount = await accountService.create(params, device_info, domain)
return createdAccount
Expand Down
5 changes: 5 additions & 0 deletions src/envVariables.js
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ let enviromentVariables = {
optional: true,
default: 'x-tenant-code',
},
SERVICE_NAME: {
message: 'Required SERVICE_NAME',
optional: true,
default: 'UserService',
},
}
let success = true

Expand Down
70 changes: 67 additions & 3 deletions src/generics/kafka-communication.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,42 @@ const pushEmailToKafka = async (message) => {

const pushUserEventsToKafka = async (message) => {
try {
const payload = { topic: process.env.EVENT_USER_KAFKA_TOPIC, messages: [{ value: JSON.stringify(message) }] }
return await pushPayloadToKafka(payload)
console.log('📤 [USER KAFKA PRODUCER] ===== KAFKA USER EVENT DETAILS =====')
console.log('📤 [USER KAFKA PRODUCER] Target Topic:', process.env.EVENT_USER_KAFKA_TOPIC)
console.log('📤 [USER KAFKA PRODUCER] Kafka Producer Status:', kafkaProducer ? 'Connected' : 'Not Connected')
console.log('📤 [USER KAFKA PRODUCER] Message Details:')
console.log('📤 [USER KAFKA PRODUCER] Entity:', message.entity)
console.log('📤 [USER KAFKA PRODUCER] Event Type:', message.eventType)
console.log('📤 [USER KAFKA PRODUCER] Entity ID:', message.entityId)
console.log('📤 [USER KAFKA PRODUCER] Tenant Code:', message.tenant_code)
console.log('📤 [USER KAFKA PRODUCER] Created By:', message.created_by)
console.log('📤 [USER KAFKA PRODUCER] Organizations:', message.organizations?.length || 0, 'orgs')

const messageString = JSON.stringify(message)
console.log('📤 [USER KAFKA PRODUCER] Payload Size:', messageString.length, 'bytes')
console.log('📤 [USER KAFKA PRODUCER] First 200 chars:', messageString.substring(0, 200) + '...')

const payload = { topic: process.env.EVENT_USER_KAFKA_TOPIC, messages: [{ value: messageString }] }
console.log('📤 [USER KAFKA PRODUCER] Kafka Payload Prepared:')
console.log('📤 [USER KAFKA PRODUCER] Topic:', payload.topic)
console.log('📤 [USER KAFKA PRODUCER] Messages Count:', payload.messages.length)

console.log('📤 [USER KAFKA PRODUCER] Sending to Kafka...')
const response = await pushPayloadToKafka(payload)

console.log('📤 [USER KAFKA PRODUCER] ✅ Kafka Response Received:')
console.log('📤 [USER KAFKA PRODUCER] Success:', response ? 'Yes' : 'No')
if (response && response[0]) {
console.log('📤 [USER KAFKA PRODUCER] Topic:', response[0].topicName)
console.log('📤 [USER KAFKA PRODUCER] Partition:', response[0].partition)
console.log('📤 [USER KAFKA PRODUCER] Offset:', response[0].baseOffset)
}
console.log('📤 [USER KAFKA PRODUCER] ===== KAFKA USER EVENT COMPLETED =====')

return response
} catch (error) {
console.log(error)
console.log('📤 [USER KAFKA PRODUCER] ❌ ERROR in pushUserEventsToKafka:', error.message)
console.log('📤 [USER KAFKA PRODUCER] ❌ Error stack:', error.stack)
return error
}
}
Expand Down Expand Up @@ -50,9 +82,41 @@ const pushOrganizationEventsToKafka = async (message) => {

const pushPayloadToKafka = async (payload) => {
try {
console.log('📤 [KAFKA PRODUCER] ===== SENDING PAYLOAD TO KAFKA =====')
console.log('📤 [KAFKA PRODUCER] Producer Ready:', !!kafkaProducer)
console.log('📤 [KAFKA PRODUCER] Payload Topic:', payload.topic)
console.log('📤 [KAFKA PRODUCER] Message Count:', payload.messages.length)
console.log('📤 [KAFKA PRODUCER] Kafka Brokers:', process.env.KAFKA_URL || 'Not configured')

console.log('📤 [KAFKA PRODUCER] Calling kafkaProducer.send()...')
const startTime = Date.now()
let response = await kafkaProducer.send(payload)
const endTime = Date.now()

console.log('📤 [KAFKA PRODUCER] ✅ Kafka Send Completed:')
console.log('📤 [KAFKA PRODUCER] Duration:', endTime - startTime, 'ms')
console.log('📤 [KAFKA PRODUCER] Response Type:', typeof response)
console.log('📤 [KAFKA PRODUCER] Response Array Length:', Array.isArray(response) ? response.length : 'Not an array')

if (response && Array.isArray(response) && response.length > 0) {
response.forEach((result, index) => {
console.log(`📤 [KAFKA PRODUCER] Result ${index}:`)
console.log(`📤 [KAFKA PRODUCER] Topic: ${result.topicName}`)
console.log(`📤 [KAFKA PRODUCER] Partition: ${result.partition}`)
console.log(`📤 [KAFKA PRODUCER] Base Offset: ${result.baseOffset}`)
console.log(`📤 [KAFKA PRODUCER] Log Start Offset: ${result.logStartOffset}`)
})
}
console.log('📤 [KAFKA PRODUCER] ===== KAFKA SEND COMPLETED =====')

return response
} catch (error) {
console.log('📤 [KAFKA PRODUCER] ❌ ERROR in pushPayloadToKafka:', error.message)
console.log('📤 [KAFKA PRODUCER] ❌ Error details:', {
name: error.name,
code: error.code,
stack: error.stack?.split('\n').slice(0, 3).join('\n')
})
return error
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/health-checks/health-check.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@
const { healthCheckHandler } = require('elevate-services-health-check')
const healthCheckConfig = require('./health.config')
const { v1: uuidv1 } = require('uuid')
const packageFile = require('../package.json')

let health_check = async function (req, res) {
try {
const response = await healthCheckHandler(healthCheckConfig, req.query.basicCheck, req.query.serviceName)
const response = await healthCheckHandler(
healthCheckConfig,
req.query.basicCheck,
req.query.serviceName,
packageFile.version
)
res.status(200).json(response)
} catch (err) {
console.error('Health config validation failed:', err.message || err)
res.status(400).json({
id: 'userService.Health.API',
ver: '1.0',
ts: new Date(),
params: {
resmsgid: uuidv1(),
Expand All @@ -41,7 +46,6 @@ let healthCheckStatus = function (req, res) {
let response = function (req, result) {
return {
id: 'User.service.Health.API',
ver: '1.0',
ts: new Date(),
params: {
resmsgid: uuidv1(),
Expand Down
1 change: 0 additions & 1 deletion src/health-checks/health.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

module.exports = {
name: process.env.SERVICE_NAME,
version: '1.0.0',
checks: {
kafka: {
enabled: true,
Expand Down
22 changes: 10 additions & 12 deletions src/helpers/eventBroadcasterMain.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ const isEventEnabled = (eventGroup) => {
case 'organizationEvents':
return process.env.EVENT_ENABLE_ORG_EVENTS !== 'false'
case 'userEvents':
return process.env.EVENT_ENABLE_USER_EVENTS !== 'false'
const apiEnabled = process.env.EVENT_ENABLE_USER_EVENTS !== 'false'
return apiEnabled
case 'tenantEvents':
return process.env.EVENT_ENABLE_TENANT_EVENTS !== 'false'
case 'userEvents-kafka':
return process.env.EVENT_ENABLE_USER_KAFKA_EVENTS !== 'false'
const kafkaEnabled = process.env.EVENT_ENABLE_USER_KAFKA_EVENTS !== 'false'
return kafkaEnabled
case 'tenantEvents-kafka':
return process.env.EVENT_ENABLE_TENANT_KAFKA_EVENTS !== 'false'
case 'organizationEvents-kafka':
Expand All @@ -43,7 +45,6 @@ const isEventEnabled = (eventGroup) => {

exports.eventBroadcasterMain = async (eventGroup, { requestBody, headers = {}, isInternal = true }) => {
try {
console.log('API Event ')
if (!requestBody) throw new Error('Event Body Generation Failed')
if (!isEventEnabled(eventGroup)) throw new Error(`Events Not Enabled For The Group "${eventGroup}"`)
if (isInternal) headers.internal_access_token = process.env.INTERNAL_ACCESS_TOKEN
Expand All @@ -52,13 +53,12 @@ exports.eventBroadcasterMain = async (eventGroup, { requestBody, headers = {}, i
return requester.post(endPoint, '', headers, requestBody)
})
const results = await Promise.allSettled(requestPromises)
console.log('PROMISE ------->>> ', results)
results.forEach((result, index) => {
if (result.status === 'rejected')
console.error(`Error for endpoint ${endPoints[index].url}:`, result.reason)
})
} catch (err) {
console.log(err)
console.log(`[EVENT BROADCASTER] API Event error: ${err.message}`)
}
}
exports.eventBroadcasterKafka = async (eventGroup, { requestBody }) => {
Expand All @@ -78,17 +78,14 @@ exports.eventBroadcasterKafka = async (eventGroup, { requestBody }) => {
await kafkaCommunication.pushTenantEventsToKafka(requestBody)
break
default:
console.log('No Kafka Event Group Found')
break
}
} catch (err) {
console.log(err)
console.log(`[EVENT BROADCASTER] Kafka Event error: ${err.message}`)
}
}
exports.broadcastEvent = async (eventGroup, { requestBody, headers = {}, isInternal = true }) => {
try {
//TODO: Remove this log after testing
console.log(util.inspect(requestBody, { depth: null, colors: true, compact: false }))

// Fire both broadcaster functions concurrently
const broadcastPromises = [
Expand All @@ -102,12 +99,13 @@ exports.broadcastEvent = async (eventGroup, { requestBody, headers = {}, isInter
// Check for failed promises and throw warnings
results.forEach((result, index) => {
if (result.status === 'rejected') {
const broadcaster = index === 0 ? 'eventBroadcasterMain' : 'eventBroadcasterKafka'
console.warn(`Warning: ${broadcaster} failed for eventGroup "${eventGroup}": ${result.reason}`)
const broadcaster = index === 0 ? 'eventBroadcasterMain (API)' : 'eventBroadcasterKafka'
} else {
const broadcaster = index === 0 ? 'eventBroadcasterMain (API)' : 'eventBroadcasterKafka'
}
})
} catch (err) {
// Log any unexpected errors from the promise settlement
console.error('Error in broadcastEvent:', err)
console.error('[EVENT BROADCASTER] Error in broadcastEvent:', err)
}
}
4 changes: 2 additions & 2 deletions src/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "com.shikshalokam.mentoring.userservice",
"version": "1.0.0",
"version": "3.3.24",
"description": "A user service for mentoring capability",
"main": "app.js",
"bin": {
Expand Down Expand Up @@ -45,7 +45,7 @@
"elevate-encryption": "^1.0.1",
"elevate-logger": "^3.1.0",
"elevate-node-cache": "^1.0.6",
"elevate-services-health-check": "^0.0.6",
"elevate-services-health-check": "^0.0.9",
"email-validator": "^2.0.4",
"express": "^4.17.1",
"express-fileupload": "^1.2.1",
Expand Down
Loading