Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def __init__(
timeout_seconds=60,
min_send_frequency_seconds: Optional[float] = None,
auth_config=None,
max_queued_responses_per_endpoint: int = 1000,
max_ipc_queue_size: int = -1,
is_ipc_blocking: bool = True,
data_reader_sink_is_lossy: bool = True,
):
"""Initializes a fake runtime initializer.

Expand All @@ -54,6 +58,10 @@ def __init__(
timeout_seconds: Timeout in seconds.
min_send_frequency_seconds: Minimum send frequency in seconds.
auth_config: Fake auth configuration.
max_queued_responses_per_endpoint: Fake max queued responses.
max_ipc_queue_size: Fake max IPC queue size.
is_ipc_blocking: Fake IPC blocking flag.
data_reader_sink_is_lossy: Fake lossy flag for data reader sink.
"""
# Store the string, but also prepare the enum
if service_type_str == "Server":
Expand All @@ -64,12 +72,18 @@ def __init__(
raise ValueError(f"Invalid service_type_str: {service_type_str}")

# This is what RuntimeConfig would store if initialized directly with an enum
# These need to be set for RuntimeConfig's cloning/property access logic
self._RuntimeConfig__service_type = self.__service_type_enum_val

self.data_aggregator_client = data_aggregator_client
self.timeout_seconds = timeout_seconds
self.auth_config = auth_config
self.min_send_frequency_seconds = min_send_frequency_seconds
self._RuntimeConfig__data_aggregator_client = data_aggregator_client
self._RuntimeConfig__timeout_seconds = timeout_seconds
self._RuntimeConfig__auth_config = auth_config
self._RuntimeConfig__min_send_frequency_seconds = min_send_frequency_seconds
self._RuntimeConfig__max_queued_responses_per_endpoint = (
max_queued_responses_per_endpoint
)
self._RuntimeConfig__max_ipc_queue_size = max_ipc_queue_size
self._RuntimeConfig__is_ipc_blocking = is_ipc_blocking
self._RuntimeConfig__data_reader_sink_is_lossy = data_reader_sink_is_lossy

# Attributes/methods that might be called by the class under test or its collaborators
self.create_called = False
Expand All @@ -83,7 +97,39 @@ def create(self, thread_watcher, data_handler, grpc_channel_factory):

@property
def service_type_enum(self):
return self.__service_type_enum_val
return self._RuntimeConfig__service_type

@property
def data_aggregator_client(self):
return self._RuntimeConfig__data_aggregator_client

@property
def timeout_seconds(self):
return self._RuntimeConfig__timeout_seconds

@property
def auth_config(self):
return self._RuntimeConfig__auth_config

@property
def min_send_frequency_seconds(self):
return self._RuntimeConfig__min_send_frequency_seconds

@property
def max_queued_responses_per_endpoint(self):
return self._RuntimeConfig__max_queued_responses_per_endpoint

@property
def max_ipc_queue_size(self):
return self._RuntimeConfig__max_ipc_queue_size

@property
def is_ipc_blocking(self):
return self._RuntimeConfig__is_ipc_blocking

@property
def data_reader_sink_is_lossy(self):
return self._RuntimeConfig__data_reader_sink_is_lossy


@pytest.fixture
Expand Down
63 changes: 55 additions & 8 deletions tsercom/api/local_process/local_runtime_factory_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def __init__(
timeout_seconds=60,
min_send_frequency_seconds: Optional[float] = None,
auth_config=None,
max_queued_responses_per_endpoint: int = 1000,
max_ipc_queue_size: int = -1,
is_ipc_blocking: bool = True,
data_reader_sink_is_lossy: bool = True,
): # Added params
"""Initializes a fake runtime initializer.

Expand All @@ -32,26 +36,69 @@ def __init__(
timeout_seconds: Timeout in seconds.
min_send_frequency_seconds: Minimum send frequency in seconds.
auth_config: Fake auth configuration.
max_queued_responses_per_endpoint: Fake max queued responses.
max_ipc_queue_size: Fake max IPC queue size.
is_ipc_blocking: Fake IPC blocking flag.
data_reader_sink_is_lossy: Fake lossy flag for data reader sink.
"""
if service_type_str == "Server":
self.__service_type_enum_val = ServiceType.SERVER
self.__service_type_enum_val_prop = ServiceType.SERVER
elif service_type_str == "Client":
self.__service_type_enum_val = ServiceType.CLIENT
self.__service_type_enum_val_prop = ServiceType.CLIENT
else:
raise ValueError(f"Invalid service_type_str: {service_type_str}")

self._RuntimeConfig__service_type = self.__service_type_enum_val
self.data_aggregator_client = data_aggregator_client
self.timeout_seconds = timeout_seconds
self.auth_config = auth_config
self.min_send_frequency_seconds = min_send_frequency_seconds
self._RuntimeConfig__service_type = self.__service_type_enum_val_prop
self._RuntimeConfig__data_aggregator_client = data_aggregator_client
self._RuntimeConfig__timeout_seconds = timeout_seconds
self._RuntimeConfig__auth_config = auth_config
self._RuntimeConfig__min_send_frequency_seconds = min_send_frequency_seconds
self._RuntimeConfig__max_queued_responses_per_endpoint = (
max_queued_responses_per_endpoint
)
self._RuntimeConfig__max_ipc_queue_size = max_ipc_queue_size
self._RuntimeConfig__is_ipc_blocking = is_ipc_blocking
self._RuntimeConfig__data_reader_sink_is_lossy = data_reader_sink_is_lossy

self.create_called = False
self.create_args = None
self.runtime_to_return = FakeRuntime()

@property
def service_type_enum(self):
return self.__service_type_enum_val
return self._RuntimeConfig__service_type

@property
def data_aggregator_client(self):
return self._RuntimeConfig__data_aggregator_client

@property
def timeout_seconds(self):
return self._RuntimeConfig__timeout_seconds

@property
def auth_config(self):
return self._RuntimeConfig__auth_config

@property
def min_send_frequency_seconds(self):
return self._RuntimeConfig__min_send_frequency_seconds

@property
def max_queued_responses_per_endpoint(self):
return self._RuntimeConfig__max_queued_responses_per_endpoint

@property
def max_ipc_queue_size(self):
return self._RuntimeConfig__max_ipc_queue_size

@property
def is_ipc_blocking(self):
return self._RuntimeConfig__is_ipc_blocking

@property
def data_reader_sink_is_lossy(self):
return self._RuntimeConfig__data_reader_sink_is_lossy

def create(self, thread_watcher, data_handler, grpc_channel_factory):
self.create_called = True
Expand Down
7 changes: 5 additions & 2 deletions tsercom/api/runtime_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def __init__(
in-process runtimes. If `None`, a default instance is created.
split_runtime_factory_factory: An optional factory for creating
out-of-process (split) runtimes. If `None`, a default instance
is created.
is created, which will use IPC settings from the RuntimeConfig
of the initializers it processes.
process_creator: An optional helper for creating new processes,
primarily for testing. If `None`, a default `ProcessCreator`
is used.
Expand Down Expand Up @@ -157,7 +158,9 @@ def __init__(
)
)
self.__split_runtime_factory_factory = SplitRuntimeFactoryFactory(
default_split_factory_thread_pool, self.__thread_watcher
thread_pool=default_split_factory_thread_pool,
thread_watcher=self.__thread_watcher,
# IPC settings will be derived from RuntimeInitializer by SRFF
)

# Initialize ProcessCreator with the context from split_runtime_factory_factory
Expand Down
4 changes: 3 additions & 1 deletion tsercom/api/split_process/remote_runtime_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ def _remote_data_reader(
# Note: Base `RuntimeFactory` expects RemoteDataReader[AnnotatedInstance[DataTypeT]].
# DataReaderSink is compatible.
if self.__data_reader_sink is None:
self.__data_reader_sink = DataReaderSink(self.__data_reader_queue)
self.__data_reader_sink = DataReaderSink(
self.__data_reader_queue, is_lossy=self.data_reader_sink_is_lossy
)
return self.__data_reader_sink

def _event_poller(
Expand Down
69 changes: 59 additions & 10 deletions tsercom/api/split_process/remote_runtime_factory_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def __init__(
timeout_seconds=60,
min_send_frequency_seconds: Optional[float] = None,
auth_config=None,
max_queued_responses_per_endpoint: int = 1000,
max_ipc_queue_size: int = -1,
is_ipc_blocking: bool = True,
data_reader_sink_is_lossy: bool = True,
):
"""Initializes a fake runtime initializer.

Expand All @@ -47,22 +51,31 @@ def __init__(
timeout_seconds: Timeout in seconds.
min_send_frequency_seconds: Minimum send frequency in seconds.
auth_config: Fake auth configuration.
max_queued_responses_per_endpoint: Fake max queued responses.
max_ipc_queue_size: Fake max IPC queue size.
is_ipc_blocking: Fake IPC blocking flag.
data_reader_sink_is_lossy: Fake lossy flag for data reader sink.
"""
# Store the string, but also prepare the enum
if service_type_str == "Server":
self.__service_type_enum_val = ServiceType.SERVER
self.__service_type_enum_val_prop = ServiceType.SERVER
elif service_type_str == "Client":
self.__service_type_enum_val = ServiceType.CLIENT
self.__service_type_enum_val_prop = ServiceType.CLIENT
else:
raise ValueError(f"Invalid service_type_str: {service_type_str}")

# This is what RuntimeConfig would store if initialized directly with an enum
self._RuntimeConfig__service_type = self.__service_type_enum_val

self.data_aggregator_client = data_aggregator_client
self.timeout_seconds = timeout_seconds
self.auth_config = auth_config
self.min_send_frequency_seconds = min_send_frequency_seconds
self._RuntimeConfig__service_type = self.__service_type_enum_val_prop
self._RuntimeConfig__data_aggregator_client = data_aggregator_client
self._RuntimeConfig__timeout_seconds = timeout_seconds
self._RuntimeConfig__auth_config = auth_config
self._RuntimeConfig__min_send_frequency_seconds = min_send_frequency_seconds
self._RuntimeConfig__max_queued_responses_per_endpoint = (
max_queued_responses_per_endpoint
)
self._RuntimeConfig__max_ipc_queue_size = max_ipc_queue_size
self._RuntimeConfig__is_ipc_blocking = is_ipc_blocking
self._RuntimeConfig__data_reader_sink_is_lossy = data_reader_sink_is_lossy

self.create_called_with = None
self.create_call_count = 0
Expand Down Expand Up @@ -92,7 +105,39 @@ def create(

@property
def service_type_enum(self):
return self.__service_type_enum_val
return self._RuntimeConfig__service_type

@property
def data_aggregator_client(self):
return self._RuntimeConfig__data_aggregator_client

@property
def timeout_seconds(self):
return self._RuntimeConfig__timeout_seconds

@property
def auth_config(self):
return self._RuntimeConfig__auth_config

@property
def min_send_frequency_seconds(self):
return self._RuntimeConfig__min_send_frequency_seconds

@property
def max_queued_responses_per_endpoint(self):
return self._RuntimeConfig__max_queued_responses_per_endpoint

@property
def max_ipc_queue_size(self):
return self._RuntimeConfig__max_ipc_queue_size

@property
def is_ipc_blocking(self):
return self._RuntimeConfig__is_ipc_blocking

@property
def data_reader_sink_is_lossy(self):
return self._RuntimeConfig__data_reader_sink_is_lossy


class FakeMultiprocessQueueSource:
Expand Down Expand Up @@ -155,8 +200,10 @@ def clear_instances(cls):
class FakeDataReaderSink:
_instances = []

def __init__(self, data_reader_queue_sink):
# Updated to accept is_lossy
def __init__(self, data_reader_queue_sink, is_lossy=True):
self.data_reader_queue_sink = data_reader_queue_sink
self.is_lossy_param = is_lossy # Store for assertion
FakeDataReaderSink._instances.append(self)

@classmethod
Expand Down Expand Up @@ -397,6 +444,8 @@ def test_create_method(
data_reader_instance.data_reader_queue_sink
is factory._RemoteRuntimeFactory__data_reader_queue
)
# Check the is_lossy flag passed to DataReaderSink
assert data_reader_instance.is_lossy_param == factory.data_reader_sink_is_lossy
assert factory._RemoteRuntimeFactory__data_reader_sink is data_reader_instance

# Assert FakeRuntimeCommandSource interactions
Expand Down
12 changes: 10 additions & 2 deletions tsercom/api/split_process/split_runtime_factory_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,16 @@ def _create_pair(
mp_context = self.__mp_context_provider.context
queue_factory_instance = self.__mp_context_provider.queue_factory

event_sink, event_source = queue_factory_instance.create_queues()
data_sink, data_source = queue_factory_instance.create_queues()
max_ipc_q_size = initializer.max_ipc_queue_size
is_ipc_block = initializer.is_ipc_blocking
event_sink, event_source = queue_factory_instance.create_queues(
max_ipc_queue_size=max_ipc_q_size,
is_ipc_blocking=is_ipc_block,
)
data_sink, data_source = queue_factory_instance.create_queues(
max_ipc_queue_size=max_ipc_q_size,
is_ipc_blocking=is_ipc_block,
)

# Command queues use a Default factory but with the context derived from the provider,
# ensuring consistency if the main context is, for example, PyTorch-specific.
Expand Down
Loading
Loading