Skip to content
121 changes: 121 additions & 0 deletions app/jobs/tracebook/llm_redaction_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# frozen_string_literal: true

module Tracebook
# Background job for async LLM-based PII redaction.
#
# This job processes interactions that need additional LLM-based redaction
# after initial pattern-based redaction. Useful for catching PII that
# patterns miss (names, addresses, context-sensitive data).
#
# Uses pessimistic locking to prevent race conditions and supports
# exponential backoff retry on failures.
#
# @example Enqueue for async processing
# LlmRedactionJob.perform_later(interaction.id)
#
# @example From retry_async failure mode
# # When LLMBased redactor fails with on_failure: :retry_async,
# # this job can be enqueued to retry later
# LlmRedactionJob.perform_later(interaction.id)
class LlmRedactionJob < ApplicationJob
queue_as :tracebook_llm_redaction

# Retry configuration
RETRY_CONFIG = {
max_attempts: 3,
base_delay: 30.seconds,
max_delay: 5.minutes
}.freeze

# Retry on standard errors with exponential backoff
retry_on StandardError,
wait: :polynomially_longer,
attempts: RETRY_CONFIG[:max_attempts]

# Don't retry if interaction was deleted
discard_on ActiveRecord::RecordNotFound

# @param interaction_id [Integer] ID of interaction to process
def perform(interaction_id)
interaction = Interaction.lock.find(interaction_id)

# Skip if already processed successfully
return if llm_redaction_complete?(interaction)

redactor = Tracebook.config.llm_redactor
return mark_skipped(interaction, "no_llm_redactor_configured") unless redactor

process_redaction(interaction, redactor)
rescue StandardError => error
record_failure(interaction, error) if interaction
raise # Let ActiveJob handle retry
end

private

def llm_redaction_complete?(interaction)
status = interaction.metadata&.dig("llm_redaction_status")
status == "success"
end

def process_redaction(interaction, redactor)
# Redact request payload
if interaction.request_payload.present?
request_json = JSON.generate(interaction.request_payload)
redacted_request, = redactor.call(request_json, audit: RedactionAudit.new)
interaction.request_payload = JSON.parse(redacted_request)
end

# Redact response payload
if interaction.response_payload.present?
response_json = JSON.generate(interaction.response_payload)
redacted_response, = redactor.call(response_json, audit: RedactionAudit.new)
interaction.response_payload = JSON.parse(redacted_response)
end

# Redact text fields
if interaction.request_text.present?
interaction.request_text, = redactor.call(interaction.request_text, audit: RedactionAudit.new)
end

if interaction.response_text.present?
interaction.response_text, = redactor.call(interaction.response_text, audit: RedactionAudit.new)
end

# Update metadata with success status
update_metadata(interaction, {
"llm_redaction_status" => "success",
"llm_redacted_at" => Time.current.iso8601
})

interaction.save!
end

def mark_skipped(interaction, reason)
update_metadata(interaction, {
"llm_redaction_status" => "skipped",
"llm_redaction_reason" => reason
})
interaction.save!
end

def record_failure(interaction, error)
current_attempts = interaction.metadata&.dig("llm_redaction_attempts").to_i

update_metadata(interaction, {
"llm_redaction_status" => "failed",
"llm_redaction_error" => error.message,
"llm_redaction_attempts" => current_attempts + 1,
"llm_redaction_last_attempt" => Time.current.iso8601
})
interaction.save!
rescue StandardError => save_error
# Log but don't raise - the original error should propagate
Rails.logger.error("[TraceBook] Failed to record LLM redaction failure: #{save_error.message}")
end

def update_metadata(interaction, updates)
interaction.metadata = (interaction.metadata || {}).merge(updates)
end
end
end
25 changes: 24 additions & 1 deletion app/jobs/tracebook/persist_interaction_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,36 @@ def persist_interaction(normalized, cost)

ActiveRecord::Base.transaction do
Interaction.create!(attributes).tap do |interaction|
interaction.actor = normalized.actor if normalized.actor
assign_actor(interaction, normalized)
persist_payloads(interaction, normalized)
interaction.save! if interaction.changed?
end
end
end

def assign_actor(interaction, normalized)
# Prefer deserialized actor from GlobalID
if normalized.actor_gid.present?
interaction.actor = deserialize_actor(normalized)
elsif normalized.actor_type.present? && normalized.actor_id.present?
# Use serialized type/id when no GlobalID
interaction.actor_type = normalized.actor_type
interaction.actor_id = normalized.actor_id
elsif normalized.actor
# Legacy: raw actor object (shouldn't happen with new flow, but backwards compatible)
interaction.actor = normalized.actor
end
end

def deserialize_actor(normalized)
return nil unless normalized.actor_gid.present?

GlobalID::Locator.locate(normalized.actor_gid)
rescue GlobalID::ParseError, ActiveRecord::RecordNotFound => e
Rails.logger.warn "TraceBook: Could not deserialize actor from #{normalized.actor_gid}: #{e.message}"
nil
end

def total_tokens(normalized)
[ normalized.input_tokens.to_i, normalized.output_tokens.to_i ].compact.sum
end
Expand Down
81 changes: 0 additions & 81 deletions app/models/tracebook/redaction_rule.rb

This file was deleted.

19 changes: 0 additions & 19 deletions db/migrate/20251112000400_create_tracebook_redaction_rules.rb

This file was deleted.

56 changes: 54 additions & 2 deletions lib/tracebook.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "tracebook/result"
require "tracebook/normalized_interaction"
require "tracebook/redaction_pipeline"
require "tracebook/redaction_audit"
require "tracebook/pricing"
require "tracebook/adapters"
require "tracebook/seeds/pricing_rules"
Expand Down Expand Up @@ -96,6 +97,38 @@ def reset_configuration!
@configuration_finalized = false
end

# Serializes an actor for job-safe persistence.
#
# Converts an ActiveRecord object (or similar) into a hash that can be
# safely passed to background jobs. Prefers GlobalID when available for
# reliable deserialization, falls back to type/id tuple otherwise.
#
# @param actor [ActiveRecord::Base, nil] The actor to serialize
# @return [Hash] Serialized actor data with :actor_gid or :actor_type/:actor_id keys
#
# @example With a User model (GlobalID available)
# TraceBook.serialize_actor(User.find(1))
# # => { actor_gid: "gid://myapp/User/1" }
#
# @example With a plain object (no GlobalID)
# TraceBook.serialize_actor(some_object)
# # => { actor_type: "SomeObject", actor_id: 123 }
#
# @example With nil
# TraceBook.serialize_actor(nil)
# # => {}
def serialize_actor(actor)
return {} unless actor

if actor.respond_to?(:to_global_id)
{ actor_gid: actor.to_global_id.to_s }
elsif actor.respond_to?(:id) && actor.class.respond_to?(:name)
{ actor_type: actor.class.name, actor_id: actor.id }
else
{}
end
end

# Records an LLM interaction.
#
# When `config.persist_async` is true, the interaction is enqueued via
Expand Down Expand Up @@ -150,14 +183,18 @@ def reset_configuration!
# latency_ms: 30000
# )
def record!(**attributes)
# Build normalized interaction and apply redaction BEFORE job enqueue
# This ensures no raw PII ever enters the job queue (critical security fix)
payload = build_normalized_interaction(attributes)
redacted_payload = apply_redaction(payload)

result = Result.new(idempotency_key: attributes[:idempotency_key])

if config.persist_async
PersistInteractionJob.perform_later(payload.to_h)
PersistInteractionJob.perform_later(redacted_payload.to_h)
result
else
interaction = PersistInteractionJob.perform_now(payload.to_h)
interaction = PersistInteractionJob.perform_now(redacted_payload.to_h)
Result.new(interaction: interaction, idempotency_key: attributes[:idempotency_key])
end
rescue StandardError => error
Expand All @@ -177,6 +214,21 @@ def ensure_configurable!
raise ConfigurationError, "TraceBook configuration is already finalized"
end

def apply_redaction(normalized)
# Serialize actor BEFORE pipeline (deep_dup doesn't handle arbitrary objects well)
actor_data = serialize_actor(normalized.actor)

pipeline = RedactionPipeline.new(config: config)
redacted = pipeline.call(normalized)

# Return new normalized with serialized actor data
# Remove :actor (raw object) and :redaction_audit (not serializable by ActiveJob)
# redaction_audit is for call-time observability, not persistence
NormalizedInteraction.new(
**redacted.to_h.except(:actor, :redaction_audit).merge(actor_data)
)
end

def build_normalized_interaction(attributes)
NormalizedInteraction.new(
provider: attributes.fetch(:provider),
Expand Down
Loading