Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/ldclient-rb/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ class DataSystemConfig
# The (optional) builder proc for FDv1-compatible fallback synchronizer
#
def initialize(initializers: nil, primary_synchronizer: nil, secondary_synchronizer: nil,
data_store_mode: LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY, data_store: nil, fdv1_fallback_synchronizer: nil)
data_store_mode: LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY, data_store: nil, fdv1_fallback_synchronizer: nil)
@initializers = initializers
@primary_synchronizer = primary_synchronizer
@secondary_synchronizer = secondary_synchronizer
Expand Down
6 changes: 3 additions & 3 deletions lib/ldclient-rb/data_system.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize
@primary_synchronizer = nil
@secondary_synchronizer = nil
@fdv1_fallback_synchronizer = nil
@data_store_mode = LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY
@data_store_mode = LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY
@data_store = nil
end

Expand Down Expand Up @@ -205,7 +205,7 @@ def self.custom
# @return [ConfigBuilder]
#
def self.daemon(store)
custom.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY)
custom.data_store(store, LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY)
end

#
Expand All @@ -219,7 +219,7 @@ def self.daemon(store)
# @return [ConfigBuilder]
#
def self.persistent_store(store)
default.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_WRITE)
default.data_store(store, LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_WRITE)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/ldclient-rb/impl/data_store/store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def get_data_store_status_provider
private def send_change_events(affected_items)
affected_items.each do |item|
if item[:kind] == FEATURES
@flag_change_broadcaster.broadcast(item[:key])
@flag_change_broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(item[:key]))
end
end
end
Expand Down
288 changes: 288 additions & 0 deletions lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
require 'concurrent/atomics'
require 'ldclient-rb/impl/data_system'
require 'ldclient-rb/interfaces/data_system'
require 'ldclient-rb/util'
require 'thread'

module LaunchDarkly
module Impl
module Integrations
module TestData
#
# Internal implementation of both Initializer and Synchronizer protocols for TestDataV2.
#
# This component bridges the test data management in TestDataV2 with the FDv2 protocol
# interfaces. Each instance implements both Initializer and Synchronizer protocols
# and receives change notifications for dynamic updates.
#
class TestDataSourceV2
include LaunchDarkly::Interfaces::DataSystem::Initializer
include LaunchDarkly::Interfaces::DataSystem::Synchronizer

# @api private
#
# @param test_data [LaunchDarkly::Integrations::TestDataV2] the test data instance
#
def initialize(test_data)
@test_data = test_data
@closed = false
@update_queue = Queue.new
@lock = Mutex.new

# Always register for change notifications
@test_data.add_instance(self)
end

#
# Return the name of this data source.
#
# @return [String]
#
def name
'TestDataV2'
end

#
# Implementation of the Initializer.fetch method.
#
# Returns the current test data as a Basis for initial data loading.
#
# @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for test data)
# @return [LaunchDarkly::Result] A Result containing either a Basis or an error message
#
def fetch(selector_store)
begin
@lock.synchronize do
if @closed
return LaunchDarkly::Result.fail('TestDataV2 source has been closed')
end

# Get all current flags and segments from test data
init_data = @test_data.make_init_data
version = @test_data.get_version

# Build a full transfer changeset
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL)

# Add all flags to the changeset
init_data[:flags].each do |key, flag_data|
builder.add_put(
LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG,
key,
flag_data[:version] || 1,
flag_data
)
end

# Add all segments to the changeset
init_data[:segments].each do |key, segment_data|
builder.add_put(
LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT,
key,
segment_data[:version] || 1,
segment_data
)
end

# Create selector for this version
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
change_set = builder.finish(selector)

basis = LaunchDarkly::Interfaces::DataSystem::Basis.new(change_set: change_set, persist: false, environment_id: nil)

LaunchDarkly::Result.success(basis)
end
rescue => e
LaunchDarkly::Result.fail("Error fetching test data: #{e.message}", e)
end
end

#
# Implementation of the Synchronizer.sync method.
#
# Yields updates as test data changes occur.
#
# @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for test data)
# @yield [LaunchDarkly::Interfaces::DataSystem::Update] Yields Update objects as synchronization progresses
# @return [void]
#
def sync(selector_store)
# First yield initial data
initial_result = fetch(selector_store)
unless initial_result.success?
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
0,
initial_result.error,
Time.now
)
)
return
end

# Yield the initial successful state
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
change_set: initial_result.value.change_set
)

# Continue yielding updates as they arrive
until @closed
begin
# stop() will push nil to the queue to wake us up when shutting down
update = @update_queue.pop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keelerm84 Ruby doesn't have a pop with timeout like we do in Python. The stop method will push nil into the queue to stop it from blocking so I'm not sure it is needed but I wanted to call it out.


# Handle nil sentinel for shutdown
break if update.nil?

# Yield the actual update
yield update
rescue => e
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN,
0,
"Error in test data synchronizer: #{e.message}",
Time.now
)
)
break
end
end
end

#
# Stop the data source and clean up resources
#
# @return [void]
#
def stop
@lock.synchronize do
return if @closed
@closed = true
end

@test_data.closed_instance(self)
# Signal shutdown to sync generator
@update_queue.push(nil)
end

#
# Called by TestDataV2 when a flag is updated.
#
# This method converts the flag update into an FDv2 changeset and
# queues it for delivery through the sync() generator.
#
# @param flag_data [Hash] the flag data
# @return [void]
#
def upsert_flag(flag_data)
@lock.synchronize do
return if @closed

begin
version = @test_data.get_version

# Build a changes transfer changeset
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES)

# Add the updated flag
builder.add_put(
LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG,
flag_data[:key],
flag_data[:version] || 1,
flag_data
)

# Create selector for this version
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
change_set = builder.finish(selector)

# Queue the update
update = LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
change_set: change_set
)

@update_queue.push(update)
rescue => e
# Queue an error update
error_update = LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
0,
"Error processing flag update: #{e.message}",
Time.now
)
)
@update_queue.push(error_update)
end
end
end

#
# Called by TestDataV2 when a segment is updated.
#
# This method converts the segment update into an FDv2 changeset and
# queues it for delivery through the sync() generator.
#
# @param segment_data [Hash] the segment data
# @return [void]
#
def upsert_segment(segment_data)
@lock.synchronize do
return if @closed

begin
version = @test_data.get_version

# Build a changes transfer changeset
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES)

# Add the updated segment
builder.add_put(
LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT,
segment_data[:key],
segment_data[:version] || 1,
segment_data
)

# Create selector for this version
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
change_set = builder.finish(selector)

# Queue the update
update = LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
change_set: change_set
)

@update_queue.push(update)
rescue => e
# Queue an error update
error_update = LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
0,
"Error processing segment update: #{e.message}",
Time.now
)
)
@update_queue.push(error_update)
end
end
end
end
end
end
end
end

Loading