From 98e1500de98cd508cb8089d96c5fb1523240e53c Mon Sep 17 00:00:00 2001 From: Richard Zadorozny Date: Tue, 20 Jan 2026 13:31:01 -0500 Subject: [PATCH 1/2] Support multiple client connections --- .../AndroidApplicationConventionPlugin.kt | 6 - .../android/AndroidLibraryConventionPlugin.kt | 7 - snapo-link-android/gradle.properties | 6 +- snapo-link-android/gradle/libs.versions.toml | 6 +- .../gradle/wrapper/gradle-wrapper.properties | 2 +- .../snapo/link/core/SnapOInitProvider.kt | 1 - .../openai/snapo/link/core/SnapOLinkConfig.kt | 3 - .../snapo/link/core/SnapOLinkContext.kt | 53 +++ .../snapo/link/core/SnapOLinkFeature.kt | 39 +- .../snapo/link/core/SnapOLinkRegistry.kt | 37 +- .../openai/snapo/link/core/SnapOLinkServer.kt | 399 +++++------------- .../snapo/link/core/SnapOLinkSession.kt | 388 +++++++++++++++++ .../SnapOHttpUrlInterceptor.kt | 147 ++++--- .../snapo/network/NetworkInspectorFeature.kt | 91 ++-- 14 files changed, 740 insertions(+), 445 deletions(-) create mode 100644 snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkContext.kt create mode 100644 snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkSession.kt diff --git a/snapo-link-android/build-logic/src/main/kotlin/com/openai/snapo/buildlogic/android/AndroidApplicationConventionPlugin.kt b/snapo-link-android/build-logic/src/main/kotlin/com/openai/snapo/buildlogic/android/AndroidApplicationConventionPlugin.kt index bbdd83d..997972c 100644 --- a/snapo-link-android/build-logic/src/main/kotlin/com/openai/snapo/buildlogic/android/AndroidApplicationConventionPlugin.kt +++ b/snapo-link-android/build-logic/src/main/kotlin/com/openai/snapo/buildlogic/android/AndroidApplicationConventionPlugin.kt @@ -6,13 +6,10 @@ import org.gradle.api.Plugin import org.gradle.api.Project import org.gradle.kotlin.dsl.configure import org.gradle.kotlin.dsl.getByType -import org.jetbrains.kotlin.gradle.dsl.JvmTarget -import org.jetbrains.kotlin.gradle.dsl.KotlinAndroidProjectExtension class AndroidApplicationConventionPlugin : Plugin { override fun apply(target: Project) { target.pluginManager.apply("com.android.application") - target.pluginManager.apply("org.jetbrains.kotlin.android") val extension = target.extensions.getByType() extension.apply { @@ -37,8 +34,5 @@ class AndroidApplicationConventionPlugin : Plugin { } } - target.extensions.configure { - compilerOptions.jvmTarget.set(JvmTarget.JVM_11) - } } } diff --git a/snapo-link-android/build-logic/src/main/kotlin/com/openai/snapo/buildlogic/android/AndroidLibraryConventionPlugin.kt b/snapo-link-android/build-logic/src/main/kotlin/com/openai/snapo/buildlogic/android/AndroidLibraryConventionPlugin.kt index 67ce215..9c0926d 100644 --- a/snapo-link-android/build-logic/src/main/kotlin/com/openai/snapo/buildlogic/android/AndroidLibraryConventionPlugin.kt +++ b/snapo-link-android/build-logic/src/main/kotlin/com/openai/snapo/buildlogic/android/AndroidLibraryConventionPlugin.kt @@ -9,13 +9,10 @@ import org.gradle.api.publish.maven.MavenPublication import org.gradle.kotlin.dsl.configure import org.gradle.kotlin.dsl.create import org.gradle.kotlin.dsl.getByType -import org.jetbrains.kotlin.gradle.dsl.JvmTarget -import org.jetbrains.kotlin.gradle.dsl.KotlinAndroidProjectExtension class AndroidLibraryConventionPlugin : Plugin { override fun apply(target: Project) { target.pluginManager.apply("com.android.library") - target.pluginManager.apply("org.jetbrains.kotlin.android") target.pluginManager.apply("maven-publish") val extension = target.extensions.getByType() @@ -44,10 +41,6 @@ class AndroidLibraryConventionPlugin : Plugin { } } - target.extensions.configure { - compilerOptions.jvmTarget.set(JvmTarget.JVM_11) - } - target.afterEvaluate { val releaseComponent = components.findByName("release") ?: return@afterEvaluate diff --git a/snapo-link-android/gradle.properties b/snapo-link-android/gradle.properties index 7877eed..3727387 100644 --- a/snapo-link-android/gradle.properties +++ b/snapo-link-android/gradle.properties @@ -23,4 +23,8 @@ kotlin.code.style=official android.nonTransitiveRClass=true GROUP=com.openai.snapo -VERSION_FILE=../VERSION \ No newline at end of file +VERSION_FILE=../VERSION +android.uniquePackageNames=false +android.dependency.useConstraints=true +android.generateSyncIssueWhenLibraryConstraintsAreEnabled=false +android.r8.strictFullModeForKeepRules=false diff --git a/snapo-link-android/gradle/libs.versions.toml b/snapo-link-android/gradle/libs.versions.toml index c77b10a..4d7791c 100644 --- a/snapo-link-android/gradle/libs.versions.toml +++ b/snapo-link-android/gradle/libs.versions.toml @@ -1,7 +1,7 @@ [versions] -agp = "8.13.2" +agp = "9.0.0" detektGradlePlugin = "1.23.8" -gradle = "8.13.2" +gradle = "9.0.0" kotlin = "2.3.0" coroutines = "1.10.2" kotlinGradlePlugin = "2.2.20" @@ -9,7 +9,7 @@ serialization = "1.9.0" coreKtx = "1.17.0" lifecycleRuntimeKtx = "2.10.0" activityCompose = "1.12.2" -composeBom = "2025.12.01" +composeBom = "2026.01.00" okhttpBom = "5.3.2" ktor = "3.3.3" detekt = "1.23.8" diff --git a/snapo-link-android/gradle/wrapper/gradle-wrapper.properties b/snapo-link-android/gradle/wrapper/gradle-wrapper.properties index 94d44f4..90b338d 100644 --- a/snapo-link-android/gradle/wrapper/gradle-wrapper.properties +++ b/snapo-link-android/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ #Wed Sep 24 11:05:57 EDT 2025 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.13-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.1.0-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOInitProvider.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOInitProvider.kt index ceab7e3..0342bc3 100644 --- a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOInitProvider.kt +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOInitProvider.kt @@ -45,7 +45,6 @@ class SnapOInitProvider : ContentProvider() { val allowRelease = meta?.getBoolean("snapo.allow_release", false) ?: false val linkConfig = SnapOLinkConfig( - singleClientOnly = true, modeLabel = modeLabel, allowRelease = allowRelease, ) diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkConfig.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkConfig.kt index 9e34381..920dce5 100644 --- a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkConfig.kt +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkConfig.kt @@ -1,9 +1,6 @@ package com.openai.snapo.link.core data class SnapOLinkConfig( - /** Single-client policy keeps ordering simple. */ - val singleClientOnly: Boolean = true, - /** For the Hello record; reflect your current redaction mode/prefs. */ val modeLabel: String = "safe", // or "unredacted" diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkContext.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkContext.kt new file mode 100644 index 0000000..8cf2328 --- /dev/null +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkContext.kt @@ -0,0 +1,53 @@ +package com.openai.snapo.link.core + +import android.app.ActivityManager +import android.app.Application +import android.os.Process +import android.os.SystemClock + +internal class SnapOLinkContext( + private val app: Application, + private val config: SnapOLinkConfig, + private val appIconProvider: AppIconProvider = AppIconProvider(app), + featureSinkProvider: (String) -> LinkEventSink, + private val serverStartWallMs: Long = System.currentTimeMillis(), + private val serverStartMonoNs: Long = SystemClock.elapsedRealtimeNanos(), +) { + @Volatile + private var latestAppIcon: AppIcon? = null + + init { + SnapOLinkRegistry.bindSinkProvider(featureSinkProvider) + } + + fun buildHello(): Hello = + Hello( + packageName = app.packageName, + processName = appProcessName(), + pid = Process.myPid(), + serverStartWallMs = serverStartWallMs, + serverStartMonoNs = serverStartMonoNs, + mode = config.modeLabel, + features = SnapOLinkRegistry.snapshot().map { LinkFeatureInfo(it.featureId) }, + ) + + fun latestAppIcon(): AppIcon? = latestAppIcon + + fun snapshotFeatures(): List = SnapOLinkRegistry.snapshot() + + fun loadAppIconIfAvailable(): AppIcon? { + val iconEvent = appIconProvider.loadAppIcon() ?: return null + latestAppIcon = iconEvent + return iconEvent + } + + private fun appProcessName(): String { + return try { + val am = app.getSystemService(Application.ACTIVITY_SERVICE) as ActivityManager + val pid = Process.myPid() + am.runningAppProcesses?.firstOrNull { it.pid == pid }?.processName ?: app.packageName + } catch (_: Throwable) { + app.packageName + } + } +} diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt index 87fa6c8..a9109db 100644 --- a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt @@ -7,20 +7,39 @@ interface SnapOLinkFeature { /** Stable identifier used for feature envelopes. */ val featureId: String - suspend fun onClientConnected(sink: LinkEventSink) - suspend fun onFeatureOpened() - fun onClientDisconnected() + /** Called once when the link server is available so features can broadcast or target clients. */ + fun onLinkAvailable(sink: LinkEventSink) {} + + /** Invoked when a specific client opens this feature. */ + suspend fun onFeatureOpened(clientId: Long) + + /** Invoked when a client disconnects. */ + fun onClientDisconnected(clientId: Long) {} } -interface LinkEventSink { - suspend fun sendHighPriority(payload: T, serializer: SerializationStrategy) - suspend fun sendLowPriority(payload: T, serializer: SerializationStrategy) +sealed interface ClientId { + data object All : ClientId + data class Specific(val value: Long) : ClientId } -suspend inline fun LinkEventSink.sendHighPriority(payload: T) { - sendHighPriority(payload, serializer()) +enum class EventPriority { + High, + Low, +} + +interface LinkEventSink { + fun send( + payload: T, + serializer: SerializationStrategy, + clientId: ClientId = ClientId.All, + priority: EventPriority = EventPriority.High, + ) } -suspend inline fun LinkEventSink.sendLowPriority(payload: T) { - sendLowPriority(payload, serializer()) +inline fun LinkEventSink.send( + payload: T, + clientId: ClientId = ClientId.All, + priority: EventPriority = EventPriority.High, +) { + send(payload, serializer(), clientId, priority) } diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkRegistry.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkRegistry.kt index 2b16d6e..e640de8 100644 --- a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkRegistry.kt +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkRegistry.kt @@ -2,11 +2,42 @@ package com.openai.snapo.link.core object SnapOLinkRegistry { private val lock = Any() - private val features = LinkedHashSet() + private val features = LinkedHashMap() + private val linkedFeatureIds = HashSet() + private var sinkProvider: ((String) -> LinkEventSink)? = null fun register(feature: SnapOLinkFeature) { - synchronized(lock) { features.add(feature) } + val (storedFeature, provider) = synchronized(lock) { + val existing = features.putIfAbsent(feature.featureId, feature) + Pair(existing ?: feature, sinkProvider) + } + if (provider != null) { + linkFeatureIfNeeded(storedFeature, provider) + } } - fun snapshot(): List = synchronized(lock) { features.toList() } + internal fun bindSinkProvider(provider: (String) -> LinkEventSink) { + val snapshot = synchronized(lock) { + sinkProvider = provider + features.values.toList() + } + snapshot.forEach { linkFeatureIfNeeded(it, provider) } + } + + fun snapshot(): List = + synchronized(lock) { features.values.toList() } + + private fun linkFeatureIfNeeded( + feature: SnapOLinkFeature, + provider: (String) -> LinkEventSink, + ) { + val shouldLink = synchronized(lock) { + if (linkedFeatureIds.contains(feature.featureId)) return@synchronized false + linkedFeatureIds.add(feature.featureId) + true + } + if (shouldLink) { + feature.onLinkAvailable(provider(feature.featureId)) + } + } } diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkServer.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkServer.kt index c1d3a56..d635e0e 100644 --- a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkServer.kt +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkServer.kt @@ -6,34 +6,20 @@ import android.net.LocalServerSocket import android.net.LocalSocket import android.net.LocalSocketAddress import android.os.Process -import android.os.SystemClock import android.util.Log import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlinx.serialization.SerializationStrategy -import kotlinx.serialization.decodeFromString -import kotlinx.serialization.serializer -import java.io.BufferedReader -import java.io.BufferedWriter -import java.io.ByteArrayOutputStream import java.io.Closeable -import java.io.IOException -import java.io.InputStreamReader -import java.io.OutputStreamWriter -import java.net.SocketTimeoutException -import java.nio.charset.StandardCharsets +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.cancellation.CancellationException /** - * App-side server that accepts a single desktop client and delegates streaming to features. + * App-side server that accepts multiple desktop clients and delegates streaming to features. * * Transport: * ABSTRACT local UNIX domain socket → adb forward tcp:PORT localabstract:snapo_server_$pid @@ -57,21 +43,17 @@ class SnapOLinkServer( @Volatile private var writerJob: Job? = null - @Volatile - private var connectedSink: BufferedWriter? = null - - @Volatile - private var lastHighPriorityEmissionMs: Long = 0L - - private val writerLock = Mutex() - private var attachedFeatures: List = emptyList() - - @Volatile - private var latestAppIcon: AppIcon? = null - private val appIconProvider = AppIconProvider(app) - - private val serverStartWallMs = System.currentTimeMillis() - private val serverStartMonoNs = android.os.SystemClock.elapsedRealtimeNanos() + private val sessionsGuard = Any() + private val sessions = LinkedHashMap() + private val sessionIdCounter = AtomicLong(1L) + private val linkContext by lazy { + SnapOLinkContext( + app = app, + config = config, + featureSinkProvider = { featureId -> sinkFor(featureId) }, + ) + } + private val featureSinks = ConcurrentHashMap() fun start() { if (!config.allowRelease && @@ -101,7 +83,7 @@ class SnapOLinkServer( override fun close() { writerJob?.cancel() writerJob = null - cleanupActiveConnection() + snapshotSessions().forEach { it.close() } try { server?.close() } catch (_: Throwable) { @@ -111,14 +93,14 @@ class SnapOLinkServer( // ---- internals ---- - private suspend fun acceptLoop(server: LocalServerSocket) { + private fun acceptLoop(server: LocalServerSocket) { while (isActiveSafe()) { val socket = acceptSocketOrNull(server) ?: continue - handleAcceptedSocket(socket) + scope.launch(Dispatchers.IO) { handleAcceptedSocket(socket) } } } - private suspend fun acceptSocketOrNull(server: LocalServerSocket): LocalSocket? = + private fun acceptSocketOrNull(server: LocalServerSocket): LocalSocket? = try { server.accept() } catch (ce: CancellationException) { @@ -128,30 +110,25 @@ class SnapOLinkServer( } private suspend fun handleAcceptedSocket(socket: LocalSocket) { - val proceed = try { - processHandshake(socket) && !refuseAdditionalClientIfNeeded(socket) - } catch (ce: CancellationException) { - throw ce - } catch (_: Throwable) { - false + val session = createSession(socket) + registerSession(session) + session.setOnCloseListener { closedSession -> + unregisterSession(closedSession) } - try { - if (proceed) { - val writer = attachClient(socket) - writerLock.withLock { - writeHandshake(writer) - } - attachFeatures() - sendHighPriorityRecord(ReplayComplete()) - tailClient(socket) + when (val result = session.run()) { + is ClientHandshakeResult.Accepted -> Unit + is ClientHandshakeResult.Rejected -> Log.w( + TAG, + "Rejected client connection for ${result.reason}" + ) } } catch (ce: CancellationException) { throw ce } catch (_: Throwable) { // swallow and continue accept loop } finally { - cleanupActiveConnection() + session.close() try { socket.close() } catch (_: Throwable) { @@ -159,274 +136,112 @@ class SnapOLinkServer( } } - private fun processHandshake(socket: LocalSocket): Boolean { - return when (val handshakeResult = performClientHandshake(socket)) { - is ClientHandshakeResult.Accepted -> true - is ClientHandshakeResult.Rejected -> { - Log.w(TAG, "Rejected client connection for ${handshakeResult.reason}") - try { - socket.close() - } catch (_: Throwable) { - } - false - } - } - } - - private fun refuseAdditionalClientIfNeeded(socket: LocalSocket): Boolean { - if (!config.singleClientOnly || connectedSink == null) { - return false - } - - socket.use { s -> - val tmpWriter = BufferedWriter( - OutputStreamWriter( - s.outputStream, - StandardCharsets.UTF_8 - ) - ) - writeHandshake(tmpWriter) - tmpWriter.write(Ndjson.encodeToString(ReplayComplete())) - tmpWriter.write("\n") - tmpWriter.flush() - } - return true - } - - private fun attachClient(socket: LocalSocket): BufferedWriter { - val sink = BufferedWriter(OutputStreamWriter(socket.outputStream, StandardCharsets.UTF_8)) - connectedSink = sink - return sink - } + private fun createSession(socket: LocalSocket): SnapOLinkSession = + SnapOLinkSession( + sessionIdCounter.getAndIncrement(), + socket, + linkContext, + scope, + ) - private suspend fun attachFeatures() { - val features = SnapOLinkRegistry.snapshot() - val attached = ArrayList(features.size) - for (feature in features) { - val featureSink = FeatureWrappingSink(feature.featureId) - feature.onClientConnected(featureSink) - attached += AttachedFeature(feature = feature, sink = featureSink) + private fun registerSession(session: SnapOLinkSession) { + synchronized(sessionsGuard) { + sessions[session.id] = session } - attachedFeatures = attached } - private suspend fun tailClient(socket: LocalSocket) { - val reader = BufferedReader(InputStreamReader(socket.inputStream, StandardCharsets.UTF_8)) - while (true) { - val line = try { - reader.readLine() - } catch (_: Throwable) { - null - } - if (line == null) break - val trimmed = line.trimEnd('\r') - if (trimmed.isNotEmpty()) { - handleHostMessage(trimmed) - } + private fun unregisterSession(session: SnapOLinkSession) { + synchronized(sessionsGuard) { + sessions.remove(session.id) } } - private fun cleanupActiveConnection() { - val features = attachedFeatures - attachedFeatures = emptyList() - features.forEach { it.feature.onClientDisconnected() } - try { - connectedSink?.close() - } catch (_: Throwable) { - } - connectedSink = null - } + private fun snapshotSessions(): List = + synchronized(sessionsGuard) { sessions.values.toList() } - private fun appProcessName(): String { - return try { - val am = - app.getSystemService(Application.ACTIVITY_SERVICE) as android.app.ActivityManager - val pid = Process.myPid() - am.runningAppProcesses?.firstOrNull { it.pid == pid }?.processName ?: app.packageName - } catch (_: Throwable) { - app.packageName - } - } + private fun findSession(clientId: Long): SnapOLinkSession? = + synchronized(sessionsGuard) { sessions[clientId] } private fun isActiveSafe(): Boolean = (writerJob?.isActive != false) - private inner class FeatureWrappingSink( - private val featureId: String, - ) : LinkEventSink { - override suspend fun sendHighPriority(payload: T, serializer: SerializationStrategy) { - sendHighPriorityRecord(wrap(payload, serializer)) - } - - override suspend fun sendLowPriority(payload: T, serializer: SerializationStrategy) { - sendLowPriorityRecord(wrap(payload, serializer)) - } - - private fun wrap(payload: T, serializer: SerializationStrategy): LinkRecord { - val element = Ndjson.encodeToJsonElement(serializer, payload) - return FeatureEvent(feature = featureId, payload = element) - } - } - - private fun writeLine( - writer: BufferedWriter, - payload: LinkRecord, - ): Boolean { - try { - writer.write(Ndjson.encodeToString(LinkRecord.serializer(), payload)) - writer.write("\n") - writer.flush() - return true - } catch (_: Throwable) { - // connection likely dropped; accept loop will tidy up - try { - writer.close() - } catch (_: Throwable) { - } - if (connectedSink === writer) connectedSink = null - return false - } - } - - private suspend fun sendHighPriorityRecord(payload: LinkRecord) { - writerLock.withLock { - val writer = connectedSink ?: return - if (writeLine(writer, payload)) { - markHighPriorityEmission() - } - } - } - - private suspend fun sendLowPriorityRecord(payload: LinkRecord) { - val deferStart = SystemClock.elapsedRealtime() - while (currentCoroutineContext().isActive) { - if (connectedSink == null) return - if (hasRecentHighPriorityEmission() && - SystemClock.elapsedRealtime() - deferStart < MaxLowPriorityDeferMillis - ) { - delay(LowPriorityRetryDelayMillis) - continue - } - if (writerLock.tryLock()) { - val writer = connectedSink - if (writer == null) { - writerLock.unlock() - return - } - try { - writeLine(writer, payload) - } finally { - writerLock.unlock() - } - return + private fun sendHighPriorityRecordToAll(payload: LinkRecord) { + val snapshot = snapshotSessions() + snapshot.forEach { session -> + if (session.state != SnapOLinkSessionState.ACTIVE) return@forEach + if (!session.sendHighPriority(payload)) { + session.close() } - delay(LowPriorityRetryDelayMillis) } } - private fun markHighPriorityEmission() { - lastHighPriorityEmissionMs = SystemClock.elapsedRealtime() + private fun emitAppIconIfAvailable() { + val iconEvent = linkContext.loadAppIconIfAvailable() ?: return + streamAppIcon(iconEvent) } - private fun hasRecentHighPriorityEmission(): Boolean { - val last = lastHighPriorityEmissionMs - if (last == 0L) return false - return SystemClock.elapsedRealtime() - last < HighPriorityIdleThresholdMillis + private fun streamAppIcon(icon: AppIcon) { + sendHighPriorityRecordToAll(icon) } - private suspend fun emitAppIconIfAvailable() { - val iconEvent = appIconProvider.loadAppIcon() ?: return - latestAppIcon = iconEvent - streamAppIcon(iconEvent) - } + private fun sinkFor(featureId: String): LinkEventSink = + featureSinks.getOrPut(featureId) { FeatureEventSink(featureId) } - private fun performClientHandshake(socket: LocalSocket): ClientHandshakeResult { - return try { - val outcome = readClientHello(socket) - if (outcome == ClientHelloToken) { - ClientHandshakeResult.Accepted - } else { - ClientHandshakeResult.Rejected("unexpected handshake token") + private inner class FeatureEventSink( + private val featureId: String, + ) : LinkEventSink { + override fun send( + payload: T, + serializer: SerializationStrategy, + clientId: ClientId, + priority: EventPriority, + ) { + val record = wrap(payload, serializer) + when (clientId) { + ClientId.All -> sendToAll(record, priority) + is ClientId.Specific -> sendToClient(clientId.value, record, priority) } - } catch (_: SocketTimeoutException) { - ClientHandshakeResult.Rejected("handshake timeout") - } catch (ioe: IOException) { - ClientHandshakeResult.Rejected(ioe.localizedMessage ?: "handshake failure") } - } - private fun readClientHello(socket: LocalSocket): String { - socket.soTimeout = ClientHelloTimeoutMs - val input = socket.inputStream - val buffer = ByteArrayOutputStream() - try { - while (buffer.size() <= ClientHelloMaxBytes) { - val value = input.read() - if (value == -1) { - throw IOException("client handshake closed without data") - } - if (value == '\n'.code) { - val raw = buffer.toString(StandardCharsets.UTF_8.name()) - return raw.trimEnd('\r') + private fun sendToAll(record: LinkRecord, priority: EventPriority) { + val snapshot = snapshotSessions() + snapshot.forEach { session -> + if (session.state != SnapOLinkSessionState.ACTIVE) return@forEach + if (!session.isFeatureOpened(featureId)) return@forEach + if (!sendToSession(session, record, priority)) { + session.close() } - buffer.write(value) } - throw IOException("client handshake exceeded $ClientHelloMaxBytes bytes") - } finally { - socket.soTimeout = 0 } - } - private fun writeHandshake(writer: BufferedWriter) { - if ( - writeLine( - writer, - Hello( - packageName = app.packageName, - processName = appProcessName(), - pid = Process.myPid(), - serverStartWallMs = serverStartWallMs, - serverStartMonoNs = serverStartMonoNs, - mode = config.modeLabel, - features = SnapOLinkRegistry.snapshot().map { LinkFeatureInfo(it.featureId) }, - ), - ) + private fun sendToClient( + clientId: Long, + record: LinkRecord, + priority: EventPriority, ) { - markHighPriorityEmission() - } - - latestAppIcon?.let { icon -> - if (writeLine(writer, icon)) { - markHighPriorityEmission() + val session = findSession(clientId) ?: return + if (session.state != SnapOLinkSessionState.ACTIVE) return + if (!session.isFeatureOpened(featureId)) return + if (!sendToSession(session, record, priority)) { + session.close() } } - } - private suspend fun streamAppIcon(icon: AppIcon) { - writerLock.withLock { - val writer = connectedSink ?: return - if (writeLine(writer, icon)) { - markHighPriorityEmission() + private fun sendToSession( + session: SnapOLinkSession, + record: LinkRecord, + priority: EventPriority, + ): Boolean = + when (priority) { + EventPriority.High -> session.sendHighPriority(record) + EventPriority.Low -> session.sendLowPriority(record) } - } - } - private suspend fun handleHostMessage(rawLine: String) { - val message = try { - Ndjson.decodeFromString(HostMessage.serializer(), rawLine) - } catch (_: Throwable) { - return - } - when (message) { - is FeatureOpened -> handleFeatureOpened(message) + private fun wrap(payload: T, serializer: SerializationStrategy): LinkRecord { + val element = Ndjson.encodeToJsonElement(serializer, payload) + return FeatureEvent(feature = featureId, payload = element) } } - - private suspend fun handleFeatureOpened(message: FeatureOpened) { - val attached = attachedFeatures.firstOrNull { it.feature.featureId == message.feature } ?: return - attached.feature.onFeatureOpened() - } - companion object { fun start( application: Application, @@ -439,19 +254,3 @@ class SnapOLinkServer( } private const val TAG = "SnapOLink" -private const val ClientHelloToken = "HelloSnapO" -private const val ClientHelloTimeoutMs = 1_000 -private const val ClientHelloMaxBytes = 4 * 1024 -private const val HighPriorityIdleThresholdMillis = 150L -private const val LowPriorityRetryDelayMillis = 50L -private const val MaxLowPriorityDeferMillis = 2_000L - -private sealed interface ClientHandshakeResult { - object Accepted : ClientHandshakeResult - data class Rejected(val reason: String) : ClientHandshakeResult -} - -private data class AttachedFeature( - val feature: SnapOLinkFeature, - val sink: LinkEventSink, -) diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkSession.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkSession.kt new file mode 100644 index 0000000..e0597d3 --- /dev/null +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkSession.kt @@ -0,0 +1,388 @@ +package com.openai.snapo.link.core + +import android.net.LocalSocket +import android.os.SystemClock +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.onTimeout +import kotlinx.coroutines.selects.select +import kotlinx.serialization.decodeFromString +import java.io.BufferedReader +import java.io.BufferedWriter +import java.io.ByteArrayOutputStream +import java.io.IOException +import java.io.InputStreamReader +import java.io.OutputStreamWriter +import java.net.SocketTimeoutException +import java.nio.charset.StandardCharsets +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean + +internal class SnapOLinkSession( + val id: Long, + private val socket: LocalSocket, + private val context: SnapOLinkContext, + private val scope: CoroutineScope, +) { + private val writer = BufferedWriter(OutputStreamWriter(socket.outputStream, StandardCharsets.UTF_8)) + private val highPriorityQueue = Channel(capacity = HighPriorityQueueCapacity) + private val lowPriorityQueue = Channel(capacity = LowPriorityQueueCapacity) + + @Volatile + private var writerJob: Job? = null + + @Volatile + private var lastHighPriorityEmissionMs: Long = 0L + + @Volatile + var attachedFeatures: Map = emptyMap() + + @Volatile + private var sessionState: SnapOLinkSessionState = SnapOLinkSessionState.CONNECTING + + private val openedFeatures = ConcurrentHashMap.newKeySet() + + private val closed = AtomicBoolean(false) + + @Volatile + private var onCloseListener: ((SnapOLinkSession) -> Unit)? = null + + val state: SnapOLinkSessionState + get() = sessionState + + suspend fun run(): ClientHandshakeResult { + val handshake = performClientHandshake() + if (handshake is ClientHandshakeResult.Rejected) { + close() + return handshake + } + + if (!writeHandshake(context.buildHello(), context.latestAppIcon())) { + close() + return ClientHandshakeResult.Rejected("handshake write failure") + } + + sessionState = SnapOLinkSessionState.ACTIVE + attachFeatures(context.snapshotFeatures()) + startWriter() + + if (!sendHighPriority(ReplayComplete())) { + close() + return ClientHandshakeResult.Accepted + } + + try { + tailLines { line -> handleHostMessage(line) } + } finally { + close() + } + return ClientHandshakeResult.Accepted + } + + fun sendHighPriority(payload: LinkRecord): Boolean { + if (!isReady()) return false + val result = highPriorityQueue.trySend(payload) + if (result.isSuccess) return true + if (result.isClosed) return false + scope.launch { + try { + highPriorityQueue.send(payload) + } catch (_: Throwable) { + } + } + return true + } + + fun sendLowPriority(payload: LinkRecord): Boolean { + if (!isReady()) return false + val item = LowPriorityRecord(payload, SystemClock.elapsedRealtime()) + val result = lowPriorityQueue.trySend(item) + return result.isSuccess || !result.isClosed + } + + fun markClosed(): Boolean = closed.compareAndSet(false, true) + + fun isClosed(): Boolean = closed.get() + + fun close() { + if (!markClosed()) return + sessionState = SnapOLinkSessionState.CLOSED + writerJob?.cancel() + writerJob = null + highPriorityQueue.close() + lowPriorityQueue.close() + attachedFeatures.values.forEach { it.onClientDisconnected(id) } + attachedFeatures = emptyMap() + openedFeatures.clear() + closeQuietly() + onCloseListener?.invoke(this) + } + + private fun writeHandshake(hello: Hello, icon: AppIcon?): Boolean { + if (isClosed()) return false + if (!writeLine(hello)) return false + markHighPriorityEmission() + if (icon != null) { + if (!writeLine(icon)) return false + markHighPriorityEmission() + } + return true + } + + private suspend fun tailLines(onLine: suspend (String) -> Unit) { + val reader = BufferedReader(InputStreamReader(socket.inputStream, StandardCharsets.UTF_8)) + while (true) { + val line = try { + reader.readLine() + } catch (_: Throwable) { + null + } + if (line == null) break + val trimmed = line.trimEnd('\r') + if (trimmed.isNotEmpty()) { + onLine(trimmed) + } + } + } + + private fun attachFeatures(features: List) { + val attached = LinkedHashMap(features.size) + for (feature in features) { + attached[feature.featureId] = feature + } + attachedFeatures = attached + } + + private suspend fun handleHostMessage(rawLine: String) { + val message = try { + Ndjson.decodeFromString(HostMessage.serializer(), rawLine) + } catch (_: Throwable) { + return + } + when (message) { + is FeatureOpened -> handleFeatureOpened(message) + } + } + + private suspend fun handleFeatureOpened(message: FeatureOpened) { + openedFeatures.add(message.feature) + val feature = attachedFeatures[message.feature] ?: return + feature.onFeatureOpened(id) + } + + fun isFeatureOpened(featureId: String): Boolean = + openedFeatures.contains(featureId) + + private fun performClientHandshake(): ClientHandshakeResult { + return try { + val outcome = readClientHello() + if (outcome == ClientHelloToken) { + ClientHandshakeResult.Accepted + } else { + ClientHandshakeResult.Rejected("unexpected handshake token") + } + } catch (_: SocketTimeoutException) { + ClientHandshakeResult.Rejected("handshake timeout") + } catch (ioe: IOException) { + ClientHandshakeResult.Rejected(ioe.localizedMessage ?: "handshake failure") + } + } + + private fun readClientHello(): String { + socket.soTimeout = ClientHelloTimeoutMs + val input = socket.inputStream + val buffer = ByteArrayOutputStream() + try { + while (buffer.size() <= ClientHelloMaxBytes) { + val value = input.read() + if (value == -1) { + throw IOException("client handshake closed without data") + } + if (value == '\n'.code) { + val raw = buffer.toString(StandardCharsets.UTF_8.name()) + return raw.trimEnd('\r') + } + buffer.write(value) + } + throw IOException("client handshake exceeded $ClientHelloMaxBytes bytes") + } finally { + socket.soTimeout = 0 + } + } + + private fun markHighPriorityEmission() { + lastHighPriorityEmissionMs = SystemClock.elapsedRealtime() + } + + private fun hasRecentHighPriorityEmission(): Boolean { + val last = lastHighPriorityEmissionMs + if (last == 0L) return false + return SystemClock.elapsedRealtime() - last < HighPriorityIdleThresholdMillis + } + + private fun writeLine(payload: LinkRecord): Boolean { + try { + writer.write(Ndjson.encodeToString(LinkRecord.serializer(), payload)) + writer.write("\n") + writer.flush() + return true + } catch (_: Throwable) { + try { + writer.close() + } catch (_: Throwable) { + } + return false + } + } + + private fun startWriter() { + if (writerJob != null || isClosed()) return + writerJob = scope.launch(Dispatchers.IO) { writerLoop() } + } + + private suspend fun writerLoop() { + var pendingLow: LowPriorityRecord? = null + while (isReady()) { + when (val action = nextWriterAction(pendingLow)) { + is WriterAction.SendHigh -> { + if (!writeHighOrClose(action.record)) return + pendingLow = action.pendingLow + } + + is WriterAction.SendLow -> { + if (!writeLowOrClose(action.record)) return + pendingLow = null + } + + is WriterAction.BufferLow -> pendingLow = action.record + WriterAction.Closed -> return + } + } + } + + private suspend fun nextWriterAction(pendingLow: LowPriorityRecord?): WriterAction { + highPriorityQueue.tryReceive().getOrNull()?.let { + return WriterAction.SendHigh(it, pendingLow) + } + + val low = pendingLow ?: lowPriorityQueue.tryReceive().getOrNull() + if (low != null) { + val incomingHigh = if (shouldDeferLow(low)) waitForHighPriority() else null + return if (incomingHigh != null) { + WriterAction.SendHigh(incomingHigh, low) + } else { + WriterAction.SendLow(low) + } + } + + return when (val next = awaitNextRecord()) { + is QueueItem.High -> WriterAction.SendHigh(next.record, null) + is QueueItem.Low -> WriterAction.BufferLow(next.record) + QueueItem.Closed -> WriterAction.Closed + } + } + + private fun writeHighOrClose(record: LinkRecord): Boolean { + if (!writeLine(record)) { + close() + return false + } + markHighPriorityEmission() + return true + } + + private fun writeLowOrClose(record: LowPriorityRecord): Boolean { + if (!writeLine(record.record)) { + close() + return false + } + return true + } + + private fun shouldDeferLow(record: LowPriorityRecord): Boolean { + if (!hasRecentHighPriorityEmission()) return false + val elapsed = SystemClock.elapsedRealtime() - record.enqueuedAtMs + return elapsed < MaxLowPriorityDeferMillis + } + + @OptIn(ExperimentalCoroutinesApi::class) + private suspend fun waitForHighPriority(): LinkRecord? = + select { + highPriorityQueue.onReceiveCatching { result -> result.getOrNull() } + onTimeout(LowPriorityRetryDelayMillis) { null } + } + + @OptIn(ExperimentalCoroutinesApi::class) + private suspend fun awaitNextRecord(): QueueItem = + select { + highPriorityQueue.onReceiveCatching { result -> + result.getOrNull()?.let { QueueItem.High(it) } ?: QueueItem.Closed + } + lowPriorityQueue.onReceiveCatching { result -> + result.getOrNull()?.let { QueueItem.Low(it) } ?: QueueItem.Closed + } + } + + private sealed interface WriterAction { + data class SendHigh(val record: LinkRecord, val pendingLow: LowPriorityRecord?) : WriterAction + data class SendLow(val record: LowPriorityRecord) : WriterAction + data class BufferLow(val record: LowPriorityRecord) : WriterAction + object Closed : WriterAction + } + + private fun closeQuietly() { + try { + writer.close() + } catch (_: Throwable) { + } + try { + socket.close() + } catch (_: Throwable) { + } + } + + fun setOnCloseListener(listener: (SnapOLinkSession) -> Unit) { + onCloseListener = listener + if (isClosed()) { + listener(this) + } + } + + private fun isReady(): Boolean = + !isClosed() && sessionState == SnapOLinkSessionState.ACTIVE +} + +internal sealed interface ClientHandshakeResult { + object Accepted : ClientHandshakeResult + data class Rejected(val reason: String) : ClientHandshakeResult +} + +internal enum class SnapOLinkSessionState { + CONNECTING, + ACTIVE, + CLOSED, +} + +private const val ClientHelloToken = "HelloSnapO" +private const val ClientHelloTimeoutMs = 1_000 +private const val ClientHelloMaxBytes = 4 * 1024 +private const val HighPriorityIdleThresholdMillis = 150L +private const val LowPriorityRetryDelayMillis = 50L +private const val MaxLowPriorityDeferMillis = 2_000L +private const val HighPriorityQueueCapacity = 512 +private const val LowPriorityQueueCapacity = 256 + +private data class LowPriorityRecord( + val record: LinkRecord, + val enqueuedAtMs: Long, +) + +private sealed interface QueueItem { + data class High(val record: LinkRecord) : QueueItem + data class Low(val record: LowPriorityRecord) : QueueItem + object Closed : QueueItem +} diff --git a/snapo-link-android/network-httpurlconnection/src/main/java/com/openai/snapo/network/httpurlconnection/SnapOHttpUrlInterceptor.kt b/snapo-link-android/network-httpurlconnection/src/main/java/com/openai/snapo/network/httpurlconnection/SnapOHttpUrlInterceptor.kt index 140d2a6..1c02a6d 100644 --- a/snapo-link-android/network-httpurlconnection/src/main/java/com/openai/snapo/network/httpurlconnection/SnapOHttpUrlInterceptor.kt +++ b/snapo-link-android/network-httpurlconnection/src/main/java/com/openai/snapo/network/httpurlconnection/SnapOHttpUrlInterceptor.kt @@ -538,69 +538,101 @@ private class ResponseCapturingInputStream( private fun complete(error: Throwable?) { if (!completed.compareAndSet(false, true)) return val currentContext = context ?: return - val meta = responseMeta val snapshot = capture.snapshot() + val meta = responseMeta val bytes = snapshot.bytes - val totalBytes = snapshot.totalBytes.takeIf { it > 0L } ?: meta?.contentLength - val truncatedBytes = snapshot.truncatedBytes val charset = mediaType?.charsetOrUtf8() ?: Charsets.UTF_8 val isText = mediaType?.isTextLike() == true + val bodyPreview = buildBodyPreview(bytes, isText, charset) + val body = buildBody(bytes, isText, charset) + val totalBytes = snapshot.totalBytes.takeIf { it > 0L } ?: meta?.contentLength + val responseWall = meta?.responseWall ?: System.currentTimeMillis() + val responseMono = meta?.responseMono ?: SystemClock.elapsedRealtimeNanos() - val bodyPreview = if (interceptor.responseBodyPreviewBytes > 0 && bytes.isNotEmpty()) { - val previewLimit = interceptor.responseBodyPreviewBytes.coerceAtMost(bytes.size) - if (isText) { - String(bytes, 0, previewLimit, charset) - } else { - encodeToString(bytes, 0, previewLimit, NO_WRAP) - } + publishResponseIfNeeded( + currentContext = currentContext, + meta = meta, + bodyPreview = bodyPreview, + body = body, + truncatedBytes = snapshot.truncatedBytes, + totalBytes = totalBytes, + responseWall = responseWall, + responseMono = responseMono, + error = error, + ) + publishFailureIfNeeded(currentContext, error) + } + + private fun buildBodyPreview( + bytes: ByteArray, + isText: Boolean, + charset: java.nio.charset.Charset, + ): String? { + val previewLimit = interceptor.responseBodyPreviewBytes + if (previewLimit <= 0 || bytes.isEmpty()) return null + val limit = previewLimit.coerceAtMost(bytes.size) + return if (isText) { + String(bytes, 0, limit, charset) } else { - null + encodeToString(bytes, 0, limit, NO_WRAP) } + } - val body = if (bytes.isNotEmpty()) { - if (isText) { - String(bytes, charset) - } else { - encodeToString(bytes, NO_WRAP) - } + private fun buildBody( + bytes: ByteArray, + isText: Boolean, + charset: java.nio.charset.Charset, + ): String? { + if (bytes.isEmpty()) return null + return if (isText) { + String(bytes, charset) } else { - null + encodeToString(bytes, NO_WRAP) } + } - val responseWall = meta?.responseWall ?: System.currentTimeMillis() - val responseMono = meta?.responseMono ?: SystemClock.elapsedRealtimeNanos() - + private fun publishResponseIfNeeded( + currentContext: InterceptContext, + meta: ResponseMeta?, + bodyPreview: String?, + body: String?, + truncatedBytes: Long?, + totalBytes: Long?, + responseWall: Long, + responseMono: Long, + error: Throwable?, + ) { val hasPayload = !body.isNullOrEmpty() || !bodyPreview.isNullOrEmpty() || truncatedBytes != null - if (hasPayload || error != null) { - interceptor.publish { - ResponseReceived( - id = currentContext.requestId, - tWallMs = responseWall, - tMonoNs = responseMono, - code = meta?.code ?: -1, - headers = meta?.headers ?: emptyList(), - bodyPreview = bodyPreview, - body = body, - bodyTruncatedBytes = truncatedBytes, - bodySize = totalBytes, - timings = Timings(totalMs = nanosToMillis(responseMono - currentContext.startMono)), - ) - } + if (!hasPayload && error == null) return + interceptor.publish { + ResponseReceived( + id = currentContext.requestId, + tWallMs = responseWall, + tMonoNs = responseMono, + code = meta?.code ?: -1, + headers = meta?.headers ?: emptyList(), + bodyPreview = bodyPreview, + body = body, + bodyTruncatedBytes = truncatedBytes, + bodySize = totalBytes, + timings = Timings(totalMs = nanosToMillis(responseMono - currentContext.startMono)), + ) } + } - if (error != null) { - val failWall = System.currentTimeMillis() - val failMono = SystemClock.elapsedRealtimeNanos() - interceptor.publish { - RequestFailed( - id = currentContext.requestId, - tWallMs = failWall, - tMonoNs = failMono, - errorKind = error.javaClass.simpleName.ifEmpty { error.javaClass.name }, - message = error.message, - timings = Timings(totalMs = nanosToMillis(failMono - currentContext.startMono)), - ) - } + private fun publishFailureIfNeeded(currentContext: InterceptContext, error: Throwable?) { + if (error == null) return + val failWall = System.currentTimeMillis() + val failMono = SystemClock.elapsedRealtimeNanos() + interceptor.publish { + RequestFailed( + id = currentContext.requestId, + tWallMs = failWall, + tMonoNs = failMono, + errorKind = error.javaClass.simpleName.ifEmpty { error.javaClass.name }, + message = error.message, + timings = Timings(totalMs = nanosToMillis(failMono - currentContext.startMono)), + ) } } } @@ -729,12 +761,10 @@ private fun parseMediaType(value: String?): ParsedMediaType? { var charset: java.nio.charset.Charset? = null for (index in 1 until parts.size) { - val param = parts[index].trim() - if (!param.startsWith("charset=", ignoreCase = true)) continue - val valueStart = param.indexOf('=') + 1 - if (valueStart <= 0 || valueStart >= param.length) continue - val rawCharset = param.substring(valueStart).trim().trim('"') - charset = runCatching { java.nio.charset.Charset.forName(rawCharset) }.getOrNull() + val rawCharset = extractCharset(parts[index].trim()) + if (rawCharset != null) { + charset = runCatching { java.nio.charset.Charset.forName(rawCharset) }.getOrNull() + } } return ParsedMediaType(type = type, subtype = subtype, charset = charset) @@ -761,6 +791,13 @@ private fun ParsedMediaType.isEventStream(): Boolean = private fun ParsedMediaType.charsetOrUtf8(): java.nio.charset.Charset = charset ?: Charsets.UTF_8 +private fun extractCharset(param: String): String? { + if (!param.startsWith("charset=", ignoreCase = true)) return null + val valueStart = param.indexOf('=') + 1 + if (valueStart <= 0 || valueStart >= param.length) return null + return param.substring(valueStart).trim().trim('"') +} + private fun resolveEffectiveMaxBytes(maxBytes: Int, contentLength: Long?): Int { if (maxBytes <= 0) return 0 if (contentLength == null) return maxBytes diff --git a/snapo-link-android/network/src/main/java/com/openai/snapo/network/NetworkInspectorFeature.kt b/snapo-link-android/network/src/main/java/com/openai/snapo/network/NetworkInspectorFeature.kt index cf89caf..af653b0 100644 --- a/snapo-link-android/network/src/main/java/com/openai/snapo/network/NetworkInspectorFeature.kt +++ b/snapo-link-android/network/src/main/java/com/openai/snapo/network/NetworkInspectorFeature.kt @@ -1,10 +1,10 @@ package com.openai.snapo.network +import com.openai.snapo.link.core.ClientId +import com.openai.snapo.link.core.EventPriority import com.openai.snapo.link.core.LinkEventSink import com.openai.snapo.link.core.SnapOLinkFeature import com.openai.snapo.link.core.SnapOLinkRegistry -import com.openai.snapo.link.core.sendHighPriority -import com.openai.snapo.link.core.sendLowPriority import com.openai.snapo.network.record.ResponseReceived import com.openai.snapo.network.record.SnapONetRecord import kotlinx.coroutines.CoroutineScope @@ -28,7 +28,7 @@ data class NetworkInspectorConfig( ) class NetworkInspectorFeature( - private val config: NetworkInspectorConfig = NetworkInspectorConfig(), + config: NetworkInspectorConfig = NetworkInspectorConfig(), private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default), ) : SnapOLinkFeature { @@ -37,79 +37,63 @@ class NetworkInspectorFeature( private val eventBuffer = EventBuffer(config) @Volatile - private var sink: RecordSink? = null + private var sink: NetworkEventSink? = null - @Volatile - private var isOpen: Boolean = false - - @Volatile - private var hasReplayedSnapshot: Boolean = false - - override suspend fun onClientConnected(sink: LinkEventSink) { - val recordSink = RecordSink(sink) - this.sink = recordSink - isOpen = false - hasReplayedSnapshot = false + override fun onLinkAvailable(sink: LinkEventSink) { + this.sink = NetworkEventSink(sink) } - override suspend fun onFeatureOpened() { - val currentSink = sink ?: return - isOpen = true - if (hasReplayedSnapshot) return + override suspend fun onFeatureOpened(clientId: Long) { + val current = sink ?: return + val target = ClientId.Specific(clientId) val deferredBodies = mutableListOf() - val snapshot: List = bufferLock.withLock { eventBuffer.snapshot() } + val snapshot = bufferLock.withLock { eventBuffer.snapshot() } for (record in snapshot) { if (record is ResponseReceived && record.hasBodyPayload()) { - currentSink.high(record.withoutBodyPayload()) + current.send(record.withoutBodyPayload(), clientId = target) deferredBodies.add(record) } else { - currentSink.high(record) + current.send(record, clientId = target) } } - hasReplayedSnapshot = true - scheduleDeferredBodies(currentSink, deferredBodies) - } - - override fun onClientDisconnected() { - sink = null - isOpen = false - hasReplayedSnapshot = false + scheduleDeferredBodies(current, target, deferredBodies) } suspend fun publish(record: SnapONetRecord) { bufferLock.withLock { eventBuffer.append(record) } - val currentSink = sink - if (!isOpen || currentSink == null) return + val currentSink = sink ?: return when (record) { is ResponseReceived -> { if (record.hasBodyPayload()) { - currentSink.high(record.withoutBodyPayload()) + currentSink.send(record.withoutBodyPayload()) scheduleResponseBody(currentSink, record) } else { - currentSink.high(record) + currentSink.send(record) } } - else -> currentSink.high(record) + else -> currentSink.send(record) } } private fun scheduleDeferredBodies( - sink: RecordSink, + sink: NetworkEventSink, + target: ClientId, deferredBodies: List, ) { deferredBodies.forEachIndexed { index, response -> val stagger = ResponseBodyStaggerMillis * index - scheduleResponseBody(sink, response, ResponseBodyDelayMillis + stagger) + scheduleResponseBody(sink, response, target, ResponseBodyDelayMillis + stagger) } } private fun scheduleResponseBody( - sink: RecordSink, + sink: NetworkEventSink, record: ResponseReceived, + target: ClientId = ClientId.All, initialDelayMs: Long = ResponseBodyDelayMillis, ) { scope.launch(Dispatchers.IO) { @@ -117,8 +101,7 @@ class NetworkInspectorFeature( if (initialDelayMs > 0) { delay(initialDelayMs) } - if (!isOpen || this@NetworkInspectorFeature.sink !== sink) return@launch - sink.low(record) + sink.send(record, clientId = target, priority = EventPriority.Low) } catch (t: CancellationException) { throw t } catch (_: Throwable) { @@ -128,22 +111,11 @@ class NetworkInspectorFeature( private fun ResponseReceived.hasBodyPayload(): Boolean { if (!body.isNullOrEmpty()) return true - if (!bodyPreview.isNullOrEmpty()) return true return false } private fun ResponseReceived.withoutBodyPayload(): ResponseReceived = - copy(bodyPreview = null, body = null) -} - -private class RecordSink(private val delegate: LinkEventSink) { - suspend fun high(record: SnapONetRecord) { - delegate.sendHighPriority(record) - } - - suspend fun low(record: SnapONetRecord) { - delegate.sendLowPriority(record) - } + copy(body = null) } object NetworkInspector { @@ -163,10 +135,19 @@ object NetworkInspector { /** Return the active feature, or null if not initialized. */ fun getOrNull(): NetworkInspectorFeature? = feature - - // Backwards-friendly alias. - fun featureOrNull(): NetworkInspectorFeature? = feature } private const val ResponseBodyDelayMillis = 200L private const val ResponseBodyStaggerMillis = 25L + +private class NetworkEventSink( + private val delegate: LinkEventSink, +) { + fun send( + record: SnapONetRecord, + clientId: ClientId = ClientId.All, + priority: EventPriority = EventPriority.High, + ) { + delegate.send(record, SnapONetRecord.serializer(), clientId, priority) + } +} From 5aaefb5877f97f51ea79af92bcb692c3181b0d54 Mon Sep 17 00:00:00 2001 From: Richard Zadorozny Date: Wed, 21 Jan 2026 14:06:14 -0500 Subject: [PATCH 2/2] Address codex feedback via codex --- .../main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt | 2 +- .../main/java/com/openai/snapo/link/core/SnapOLinkSession.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt index a9109db..6e06242 100644 --- a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkFeature.kt @@ -10,7 +10,7 @@ interface SnapOLinkFeature { /** Called once when the link server is available so features can broadcast or target clients. */ fun onLinkAvailable(sink: LinkEventSink) {} - /** Invoked when a specific client opens this feature. */ + /** Invoked once per client session when a client opens this feature. */ suspend fun onFeatureOpened(clientId: Long) /** Invoked when a client disconnects. */ diff --git a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkSession.kt b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkSession.kt index e0597d3..a6f84d9 100644 --- a/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkSession.kt +++ b/snapo-link-android/link-core/src/main/java/com/openai/snapo/link/core/SnapOLinkSession.kt @@ -169,8 +169,8 @@ internal class SnapOLinkSession( } private suspend fun handleFeatureOpened(message: FeatureOpened) { - openedFeatures.add(message.feature) val feature = attachedFeatures[message.feature] ?: return + if (!openedFeatures.add(message.feature)) return feature.onFeatureOpened(id) }