From 6dc8cc6bd56cee4eb37e3e75feda740e40ee09a3 Mon Sep 17 00:00:00 2001 From: Edbert Chan Date: Wed, 15 Jan 2025 16:23:57 -0800 Subject: [PATCH] Refactor Persistent Workers There are issues with the current implementation of persistent workers where an OOM will break the RxJava implementation and the message will not relay back to the Bazel server. This will cause builds to run indefinitely. There is a stable version in Rules Kotlin. We will piggy back off of the public implementation. --- MODULE.bazel | 5 + rules/impl.bzl | 6 +- src/cli/AndroidLintAction.kt | 34 +++++ src/cli/AndroidLintPersistentWorker.kt | 77 +++++++++++ src/worker/BUILD | 1 + src/worker/PersistentWorker.kt | 125 ------------------ ...PersistentWorkerCpuTimeBasedGcScheduler.kt | 41 ------ src/worker/Worker.kt | 9 +- .../rules_kotlin_worker_visibilty.patch | 12 ++ 9 files changed, 135 insertions(+), 175 deletions(-) create mode 100644 src/cli/AndroidLintPersistentWorker.kt delete mode 100644 src/worker/PersistentWorker.kt delete mode 100644 src/worker/PersistentWorkerCpuTimeBasedGcScheduler.kt create mode 100644 third_party/rules_kotlin_worker_visibilty.patch diff --git a/MODULE.bazel b/MODULE.bazel index cdd6dc4..0319031 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -7,6 +7,11 @@ module( bazel_dep(name = "rules_java", version = "8.6.3") bazel_dep(name = "rules_jvm_external", version = "5.3") bazel_dep(name = "rules_kotlin", version = "2.1.0") +single_version_override( + module_name = "rules_kotlin", + patch_strip = 1, + patches = ["//third_party:rules_kotlin_worker_visibilty.patch"], +) bazel_dep(name = "bazel_skylib", version = "1.7.1") bazel_dep(name = "platforms", version = "0.0.8") diff --git a/rules/impl.bzl b/rules/impl.bzl index f4f8258..122be8c 100644 --- a/rules/impl.bzl +++ b/rules/impl.bzl @@ -15,6 +15,9 @@ load( _ANDROID_LINT_TOOLCHAIN_TYPE = "ANDROID_LINT_TOOLCHAIN_TYPE", _utils = "utils", ) +# Estimation is done by VisualVM runs + assumption of GC kicking in +def _resource_set_callback(os_name, num_inputs): + return {"cpu": 5, "memory": 10000} def _run_android_lint( ctx, @@ -135,12 +138,13 @@ def _run_android_lint( execution_requirements = { "supports-workers": "1", "supports-multiplex-workers": "1", - "requires-worker-protocol": "json", + "requires-worker-protocol": "proto", }, env = { # https://googlesamples.github.io/android-custom-lint-rules/usage/variables.md.html "ANDROID_LINT_SKIP_BYTECODE_VERIFIER": ("true" if android_lint_skip_bytecode_verifier else "false"), }, + resource_set = _resource_set_callback, ) def _get_module_name(ctx): diff --git a/src/cli/AndroidLintAction.kt b/src/cli/AndroidLintAction.kt index 47addfe..39cea93 100644 --- a/src/cli/AndroidLintAction.kt +++ b/src/cli/AndroidLintAction.kt @@ -1,18 +1,52 @@ package com.rules.android.lint.cli import com.rules.android.lint.worker.Worker +import io.bazel.worker.PersistentWorker +import io.bazel.worker.Status +import io.bazel.worker.Work +import io.bazel.worker.WorkerContext import java.io.PrintStream import java.nio.file.Files +import javax.inject.Inject import kotlin.system.exitProcess object AndroidLintAction { @JvmStatic fun main(args: Array) { + if ("--persistent_worker" in args) { + val worker = PersistentWorker() + worker.start(AndroidLintPersistentWorker()).run(::exitProcess) + return + } val worker = Worker.fromArgs(args, AndroidLintExecutor()) val exitCode = worker.processRequests() exitProcess(exitCode) } + private class AndroidLintPersistentWorker + @Inject + constructor() : Work { + override fun invoke( + ctx: WorkerContext.TaskContext, + args: Iterable, + ): Status { + val workingDirectory = Files.createTempDirectory("rules") + try { + val parsedArgs = AndroidLintActionArgs.parseArgs(args.toList()) + val result = AndroidLintRunner().runAndroidLint(parsedArgs, workingDirectory) + return if (result != 0) Status.ERROR else Status.SUCCESS + } catch (exception: Exception) { + return Status.ERROR + } finally { + try { + workingDirectory.toFile().deleteRecursively() + } catch (e: Exception) { + e.printStackTrace() + } + } + } + } + private class AndroidLintExecutor : Worker.WorkRequestCallback { override fun processWorkRequest( args: List, diff --git a/src/cli/AndroidLintPersistentWorker.kt b/src/cli/AndroidLintPersistentWorker.kt new file mode 100644 index 0000000..179284a --- /dev/null +++ b/src/cli/AndroidLintPersistentWorker.kt @@ -0,0 +1,77 @@ +package com.rules.android.lint.cli + +import com.rules.android.lint.worker.Worker +import io.bazel.worker.PersistentWorker +import io.bazel.worker.Status +import io.bazel.worker.Work +import io.bazel.worker.WorkerContext +import java.io.PrintStream +import java.nio.file.Files +import javax.inject.Inject +import kotlin.system.exitProcess + +object AndroidLintPersistentWorker { + @JvmStatic + fun main(args: Array) { + if ("--persistent_worker" in args) { + val worker = PersistentWorker() + worker.start(AndroidLint()).run(::exitProcess) + return + } + val worker = Worker.fromArgs(args, AndroidLintExecutor()) + val exitCode = worker.processRequests() + exitProcess(exitCode) + } + + private class AndroidLint + @Inject + constructor() : Work { + override fun invoke( + ctx: WorkerContext.TaskContext, + args: Iterable, + ): Status { + val workingDirectory = Files.createTempDirectory("rules") + try { + val runner = AndroidLintRunner() + val parsedArgs = AndroidLintActionArgs.parseArgs(args.toList()) + val result = runner.runAndroidLint(parsedArgs, workingDirectory) + if (result == 0) { + return Status.SUCCESS + } + return Status.ERROR + } catch (exception: Exception) { + return Status.ERROR + } finally { + try { + workingDirectory.toFile().deleteRecursively() + } catch (e: Exception) { + e.printStackTrace() + } + } + } + } + + private class AndroidLintExecutor : Worker.WorkRequestCallback { + override fun processWorkRequest( + args: List, + printStream: PrintStream, + ): Int { + val workingDirectory = Files.createTempDirectory("rules") + + try { + val runner = AndroidLintRunner() + val parsedArgs = AndroidLintActionArgs.parseArgs(args) + return runner.runAndroidLint(parsedArgs, workingDirectory) + } catch (exception: Exception) { + exception.printStackTrace() + return 1 + } finally { + try { + workingDirectory.toFile().deleteRecursively() + } catch (e: Exception) { + e.printStackTrace() + } + } + } + } +} diff --git a/src/worker/BUILD b/src/worker/BUILD index 340bb2d..e7b4615 100644 --- a/src/worker/BUILD +++ b/src/worker/BUILD @@ -12,6 +12,7 @@ kt_jvm_library( "@rules_android_lint_deps//:com_squareup_okio_okio_jvm", "@rules_android_lint_deps//:io_reactivex_rxjava3_rxjava", "@rules_android_lint_deps//:org_reactivestreams_reactive_streams", + "@rules_kotlin//src/main/kotlin:worker", ], ) diff --git a/src/worker/PersistentWorker.kt b/src/worker/PersistentWorker.kt deleted file mode 100644 index 9681426..0000000 --- a/src/worker/PersistentWorker.kt +++ /dev/null @@ -1,125 +0,0 @@ -package com.rules.android.lint.worker - -import io.reactivex.rxjava3.core.BackpressureStrategy -import io.reactivex.rxjava3.core.Flowable -import io.reactivex.rxjava3.core.Scheduler -import io.reactivex.rxjava3.schedulers.Schedulers -import java.io.BufferedOutputStream -import java.io.ByteArrayOutputStream -import java.io.IOException -import java.io.PrintStream - -internal class PersistentWorker( - /** - * WorkerIO instance wrapping the standard output streams - */ - private val workerIO: WorkerIO, - /** - * Rxjava Scheduler to execute work requests on. - */ - private val scheduler: Scheduler, - /** - * Instance of CpuTimeBasedGcScheduler that will run periodically - */ - private val persistentWorkerCpuTimeBasedGcScheduler: PersistentWorkerCpuTimeBasedGcScheduler, - /** - * Instance of CpuTimeBasedGcScheduler that will run periodically - */ - private val workRequestProcessor: Worker.WorkerMessageProcessor, - /** - * Instance of CpuTimeBasedGcScheduler that will run periodically - */ - private val workerWorkRequestCallback: Worker.WorkRequestCallback, -) : Worker { - constructor( - workerMessageProcessor: Worker.WorkRequestCallback, - ) : this( - workerIO = WorkerIO(), - scheduler = Schedulers.io(), - persistentWorkerCpuTimeBasedGcScheduler = PersistentWorkerCpuTimeBasedGcScheduler(), - workRequestProcessor = - WorkerJsonMessageProcessor( - System.`in`, - System.out, - ), - workerWorkRequestCallback = workerMessageProcessor, - ) - - /** - * Initiate the worker and begin processing work requests - */ - override fun processRequests(): Int { - return workerIO.use { io -> - // Start by redirecting the system streams so that nothing - // corrupts the streams that the worker uses - io.redirectSystemStreams() - - // Process requests as they come in using RxJava - Flowable - .create( - { emitter -> - while (!emitter.isCancelled) { - try { - val request: WorkRequest = workRequestProcessor.readWorkRequest() - emitter.onNext(request) - } catch (e: IOException) { - emitter.onError(e) - } - } - }, - BackpressureStrategy.BUFFER, - ).subscribeOn(scheduler) - .parallel() - .runOn(scheduler) - // Execute the work and map the result to a work response - .map { request -> return@map this.respondToRequest(request) } - // Run the garbage collector periodically so that we are a good responsible worker - .doOnNext { persistentWorkerCpuTimeBasedGcScheduler.maybePerformGc() } - .doOnError { it.printStackTrace() } - .sequential() - .observeOn(scheduler) - .blockingSubscribe { response -> - workRequestProcessor.writeWorkResponse(response) - } - return@use 0 - } - } - - private fun respondToRequest(request: WorkRequest): WorkResponse { - ByteArrayOutputStream().use { baos -> - // Create a print stream that the execution can write logs to - val printStream = PrintStream(BufferedOutputStream(ByteArrayOutputStream())) - var exitCode: Int - try { - // Sanity check the work request arguments - val arguments = - requireNotNull(request.arguments) { - "Request with id ${request.requestId} " + - "does not have arguments!" - } - require(arguments.isNotEmpty()) { - "Request with id ${request.requestId} " + - "does not have arguments!" - } - exitCode = workerWorkRequestCallback.processWorkRequest(arguments, printStream) - } catch (e: Exception) { - e.printStackTrace(printStream) - exitCode = 1 - } finally { - printStream.flush() - } - - val output = - arrayOf(baos.toString()) - .asSequence() - .map { it.trim() } - .filter { it.isNotEmpty() } - .joinToString("\n") - return WorkResponse( - exitCode = exitCode, - output = output, - requestId = request.requestId, - ) - } - } -} diff --git a/src/worker/PersistentWorkerCpuTimeBasedGcScheduler.kt b/src/worker/PersistentWorkerCpuTimeBasedGcScheduler.kt deleted file mode 100644 index 6f28987..0000000 --- a/src/worker/PersistentWorkerCpuTimeBasedGcScheduler.kt +++ /dev/null @@ -1,41 +0,0 @@ -package com.rules.android.lint.worker - -import com.sun.management.OperatingSystemMXBean -import java.lang.management.ManagementFactory -import java.time.Duration -import java.util.concurrent.atomic.AtomicReference - -internal class PersistentWorkerCpuTimeBasedGcScheduler( - /** - * After this much CPU time has elapsed, we may force a GC run. Set to [Duration.ZERO] to - * disable. - */ - private val cpuUsageBeforeGc: Duration = Duration.ofSeconds(10), -) { - private val cpuTime: Duration - get() = if (!cpuUsageBeforeGc.isZero) Duration.ofNanos(bean.processCpuTime) else Duration.ZERO - - /** The total process CPU time at the last GC run (or from the start of the worker). */ - private val cpuTimeAtLastGc: AtomicReference = AtomicReference(cpuTime) - - /** Call occasionally to perform a GC if enough CPU time has been used. */ - fun maybePerformGc() { - if (!cpuUsageBeforeGc.isZero) { - val currentCpuTime = cpuTime - val lastCpuTime = cpuTimeAtLastGc.get() - // Do GC when enough CPU time has been used, but only if nobody else beat us to it. - if (currentCpuTime.minus(lastCpuTime) > cpuUsageBeforeGc && - cpuTimeAtLastGc.compareAndSet(lastCpuTime, currentCpuTime) - ) { - System.gc() - // Avoid counting GC CPU time against CPU time before next GC. - cpuTimeAtLastGc.compareAndSet(currentCpuTime, cpuTime) - } - } - } - - companion object { - /** Used to get the CPU time used by this process. */ - private val bean = ManagementFactory.getOperatingSystemMXBean() as OperatingSystemMXBean - } -} diff --git a/src/worker/Worker.kt b/src/worker/Worker.kt index ca81694..1c72c01 100644 --- a/src/worker/Worker.kt +++ b/src/worker/Worker.kt @@ -27,17 +27,10 @@ interface Worker { companion object { /** * Creates the appropriate worker instance using the provided worker arguments. - * - * If `--persistent_worker` exists in the arguments, an instance of PersistentWorker will - * be returned. Otherwise an instance of InvocationWorker will be returned. */ fun fromArgs( args: Array, workerMessageProcessor: WorkRequestCallback, - ): Worker = - when { - "--persistent_worker" in args -> PersistentWorker(workerMessageProcessor) - else -> InvocationWorker(args, workerMessageProcessor) - } + ): Worker = InvocationWorker(args, workerMessageProcessor) } } diff --git a/third_party/rules_kotlin_worker_visibilty.patch b/third_party/rules_kotlin_worker_visibilty.patch new file mode 100644 index 0000000..3a1b48f --- /dev/null +++ b/third_party/rules_kotlin_worker_visibilty.patch @@ -0,0 +1,12 @@ +diff --git a/src/main/kotlin/BUILD.bazel b/src/main/kotlin/BUILD.bazel +index 5f04f0e..6668b05 100755 +--- a/src/main/kotlin/BUILD.bazel ++++ b/src/main/kotlin/BUILD.bazel +@@ -17,6 +17,7 @@ load("@rules_java//java:defs.bzl", "java_binary", "java_import") + java_import( + name = "worker", + jars = ["kotlin_worker.jar"], ++ visibility = ["//visibility:public"], + ) + + java_import( \ No newline at end of file