From 98dc99fd1981c8b73f5dc126f9b21a646cad66b9 Mon Sep 17 00:00:00 2001 From: denini08 Date: Fri, 15 Aug 2025 16:17:19 -0300 Subject: [PATCH 1/3] working in mac --- devlib/__init__.py | 2 +- devlib/target.py | 441 ++++++++++++++++++++++++++++++++++++++++++ tests/test_config.yml | 2 +- tests/test_target.py | 7 +- 4 files changed, 449 insertions(+), 3 deletions(-) diff --git a/devlib/__init__.py b/devlib/__init__.py index e496299b1..105e544cd 100644 --- a/devlib/__init__.py +++ b/devlib/__init__.py @@ -18,7 +18,7 @@ ''' from devlib.target import ( - Target, LinuxTarget, AndroidTarget, LocalLinuxTarget, + Target, LinuxTarget, AndroidTarget, LocalLinuxTarget, LocalTarget, ChromeOsTarget, ) diff --git a/devlib/target.py b/devlib/target.py index 48c0acf04..5b0ea9fc9 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -3138,6 +3138,447 @@ def _resolve_paths(self): self.working_directory = '/tmp/devlib-target' +class LocalTarget(Target): + """ + Auto-detecting local target that works on both Linux and macOS. + + This class automatically detects the underlying operating system and provides + appropriate implementations for each platform, using native commands instead + of busybox when needed. + """ + + path = posixpath + + def __init__(self, + connection_settings=None, + platform=None, + working_directory=None, + executables_directory=None, + connect=True, + modules=None, + load_default_modules=True, + shell_prompt=DEFAULT_SHELL_PROMPT, + conn_cls=LocalConnection, + is_container=False, + max_async=50, + tmp_directory=None, + ): + super(LocalTarget, self).__init__(connection_settings=connection_settings, + platform=platform, + working_directory=working_directory, + executables_directory=executables_directory, + connect=connect, + modules=modules, + load_default_modules=load_default_modules, + shell_prompt=shell_prompt, + conn_cls=conn_cls, + is_container=is_container, + max_async=max_async, + tmp_directory=tmp_directory, + ) + + @property + @memoized + def os(self): + """Detect the operating system""" + try: + uname_output = self.execute('uname -s').strip() + if uname_output == 'Darwin': + return 'darwin' + elif uname_output == 'Linux': + return 'linux' + else: + return 'unknown' + except TargetStableError: + return 'unknown' + + def _is_macos(self): + """Check if running on macOS""" + return self.os == 'darwin' + + def _is_linux(self): + """Check if running on Linux""" + return self.os == 'linux' + + @property + @memoized + def abi(self): + """Get the system architecture""" + try: + arch = self.execute('uname -m').strip() + if self._is_macos(): + # Map macOS architecture names to common names + arch_map = { + 'arm64': 'arm64', + 'x86_64': 'x86_64', + 'i386': 'x86', + } + return arch_map.get(arch, arch) + else: + # Linux architecture mapping + return ABI_MAP.get(arch, arch) + except TargetStableError: + return 'unknown' + + @property + @memoized + def os_version(self): + """Get OS version information""" + if self._is_macos(): + try: + # Get macOS version + version = self.execute('sw_vers -productVersion').strip() + build = self.execute('sw_vers -buildVersion').strip() + name = self.execute('sw_vers -productName').strip() + return { + 'name': name, + 'version': version, + 'build': build, + } + except TargetStableError: + return {} + elif self._is_linux(): + try: + os_version = {} + if self.file_exists('/etc/os-release'): + output = self.execute('cat /etc/os-release') + for line in output.split('\n'): + if '=' in line and not line.startswith('#'): + key, value = line.split('=', 1) + os_version[key] = value.strip('"') + return os_version + except TargetStableError: + return {} + return {} + + @property + @memoized + def system_id(self): + """Get a unique system identifier""" + if self._is_macos(): + try: + # Use hardware UUID as unique identifier + uuid = self.execute('system_profiler SPHardwareDataType | grep "Hardware UUID" | awk \'{print $3}\'').strip() + kernel = self.execute('uname -r').strip() + return f'{uuid}/{kernel}' + except TargetStableError: + return 'unknown' + elif self._is_linux(): + try: + kernel = self.execute('uname -r') + hardware = self.execute('ip a | grep "link/ether" | sed "s/://g" | awk \'{print $2}\' | tr -d "\\n"') + try: + filesystem = self.execute('ls /dev/disk/by-uuid | tr "\\n" "-" | sed "s/-$//"') + except TargetStableError: + filesystem = 'unknown' + return f'{hardware}/{kernel}/{filesystem}' + except TargetStableError: + return 'unknown' + return 'unknown' + + @property + @memoized + def cpuinfo(self): + """Get CPU information, using platform-appropriate method""" + try: + # First try the standard Linux approach + return Cpuinfo(self.execute('cat /proc/cpuinfo')) + except TargetStableError: + if self._is_macos(): + # If /proc/cpuinfo doesn't exist (e.g., on macOS), use sysctl + return self._get_macos_cpuinfo() + else: + # For other systems, re-raise the error + raise + + def _get_macos_cpuinfo(self): + """Generate /proc/cpuinfo-like output on macOS using sysctl""" + # Get CPU count + try: + ncpu = int(self.execute('sysctl -n hw.ncpu').strip()) + except (TargetStableError, ValueError): + ncpu = 1 + + # Get CPU brand string + try: + brand_string = self.execute('sysctl -n machdep.cpu.brand_string').strip() + except TargetStableError: + brand_string = 'Unknown CPU' + + # Get CPU family, model, etc. (if available) + cpu_family = '' + cpu_model = '' + try: + cpu_family = self.execute('sysctl -n machdep.cpu.family').strip() + except TargetStableError: + pass + + try: + cpu_model = self.execute('sysctl -n machdep.cpu.model').strip() + except TargetStableError: + pass + + # Generate /proc/cpuinfo-like format + cpuinfo_text = [] + for i in range(ncpu): + section = [ + f'processor\t: {i}', + f'model name\t: {brand_string}', + ] + if cpu_family: + section.append(f'cpu family\t: {cpu_family}') + if cpu_model: + section.append(f'model\t\t: {cpu_model}') + + cpuinfo_text.append('\n'.join(section)) + + return Cpuinfo('\n\n'.join(cpuinfo_text)) + + @property + def hostname(self): + """Get hostname using appropriate command for the platform""" + if self._is_macos() or self._is_linux(): + # Both macOS and Linux have native hostname command + return self.execute('hostname').strip() + else: + # Fallback to busybox for other systems + return self.execute('{} hostname'.format(self.busybox)).strip() + + @property + @memoized + def kernel_version(self): + """Get kernel version using appropriate command for the platform""" + if self._is_macos() or self._is_linux(): + # Both macOS and Linux have native uname command + return KernelVersion(self.execute('uname -r -v').strip()) + else: + # Fallback to busybox for other systems + return KernelVersion(self.execute('{} uname -r -v'.format(quote(self.busybox))).strip()) + + @property + def hostid(self): + """Get host ID using appropriate method for the platform""" + if self._is_macos(): + try: + return int(self.execute('hostid').strip(), 16) + except (TargetStableError, ValueError): + # hostid might not be available on macOS, return a default + return 0 + elif self._is_linux(): + try: + return int(self.execute('hostid').strip(), 16) + except (TargetStableError, ValueError): + # Fallback to busybox + return int(self.execute('{} hostid'.format(self.busybox)).strip(), 16) + else: + # Fallback to busybox for other systems + return int(self.execute('{} hostid'.format(self.busybox)).strip(), 16) + + @asyn.asyncf + async def write_value(self, path, value, verify=True, as_root=True): + """Write value to file using platform-appropriate command""" + if self._is_macos() or self._is_linux(): + # Both macOS and Linux have native printf + self.async_manager.track_access( + asyn.PathAccess(namespace='target', path=path, mode='w') + ) + value = str(value) + + if verify: + # Use the same verification logic as the parent class but with native commands + cmd = ''' +orig=$(cat {path} 2>/dev/null || printf "") +printf "%s" {value} > {path} || exit 10 +if [ {value} != "$orig" ]; then + trials=0 + while [ "$(cat {path} 2>/dev/null)" != {value} ]; do + if [ $trials -ge 10 ]; then + cat {path} + exit 11 + fi + sleep 0.01 + trials=$((trials + 1)) + done +fi +''' + else: + cmd = 'printf "%s" {value} > {path}' + cmd = cmd.format(path=quote(path), value=quote(value)) + + try: + await self.execute.asyn(cmd, check_exit_code=True, as_root=as_root) + except TargetCalledProcessError as e: + if e.returncode == 10: + raise TargetStableError('Could not write "{value}" to {path}: {e.output}'.format( + value=value, path=path, e=e)) + elif verify and e.returncode == 11: + raise TargetStableError( + 'Could not write "{value}" to {path}, reading "{actual}": {e.output}'.format( + value=value, path=path, actual=e.output, e=e)) + else: + # Fallback to parent implementation using busybox + await super(LocalTarget, self).write_value(path, value, verify, as_root) + + @asyn.asyncf + async def read_tree_values_flat(self, path, depth=1, check_exit_code=True): + """Read tree values using platform-appropriate commands""" + if self._is_macos() or self._is_linux(): + # Both macOS and Linux have native find and grep + self.async_manager.track_access( + asyn.PathAccess(namespace='target', path=path, mode='r') + ) + + # Use native find and grep commands instead of busybox + # This mimics what the shutils script does: find files then grep them all + command = f'find {quote(path)} -follow -maxdepth {depth} -type f | xargs grep -s ""' + try: + output = await self.execute.asyn(command, check_exit_code=check_exit_code, as_root=self.is_rooted) + except TargetStableError: + # If the directory doesn't exist or find fails, return empty dict + return {} + + accumulator = defaultdict(list) + for entry in output.strip().split('\n'): + if ':' not in entry: + continue + file_path, value = entry.strip().split(':', 1) + accumulator[file_path].append(value) + + result = {k: '\n'.join(v).strip() for k, v in accumulator.items()} + return result + else: + # Fallback to parent implementation using busybox + return await super(LocalTarget, self).read_tree_values_flat(path, depth, check_exit_code) + + async def _list_directory(self, path, as_root=False): + """List directory contents using platform-appropriate command""" + # Both macOS and Linux have native ls command + contents = await self.execute.asyn('ls -1 {}'.format(quote(path)), as_root=as_root) + return [x.strip() for x in contents.split('\n') if x.strip()] + + @asyn.asyncf + async def install(self, filepath, timeout=None, with_name=None): + """Install executable using platform-appropriate method""" + executable_name = with_name or os.path.basename(filepath) + on_device_file = self.path.join(self.executables_directory, executable_name) + await self.push.asyn(filepath, on_device_file, timeout=timeout) + + # Both macOS and Linux use chmod + await self.execute.asyn("chmod +x {}".format(quote(on_device_file))) + self._installed_binaries[executable_name] = on_device_file + return on_device_file + + @asyn.asyncf + async def uninstall(self, name): + """Uninstall executable""" + on_device_executable = self.path.join(self.executables_directory, name) + await self.remove.asyn(on_device_executable) + + @asyn.asyncf + async def capture_screen(self, filepath): + """Capture screen using platform-appropriate method""" + timestamp = self.execute('date -u +"%Y-%m-%dT%H:%M:%SZ"').strip() + filepath = filepath.format(ts=timestamp) + + if self._is_macos(): + await self.execute.asyn('screencapture -x {}'.format(quote(filepath))) + elif self._is_linux(): + # Try various Linux screenshot tools + try: + await self.execute.asyn('scrot {}'.format(quote(filepath))) + except TargetStableError: + try: + await self.execute.asyn('gnome-screenshot -f {}'.format(quote(filepath))) + except TargetStableError: + raise TargetStableError('No screenshot tool available (tried scrot, gnome-screenshot)') + else: + raise TargetStableError('Screen capture not supported on this platform') + + def wait_boot_complete(self, timeout=10): + """On local systems, assume boot is always complete""" + pass + + @asyn.asyncf + async def get_pids_of(self, process_name): + """Get PIDs of processes with given name""" + result = [] + try: + if self._is_macos() or self._is_linux(): + # Both have pgrep + output = await self.execute.asyn('pgrep -f {}'.format(quote(process_name))) + for line in output.strip().split('\n'): + if line.strip(): + result.append(int(line.strip())) + else: + # Fallback: parse ps output + ps_output = await self.ps.asyn() + for entry in ps_output: + if process_name in entry.name: + result.append(entry.pid) + except TargetStableError: + # pgrep returns non-zero when no matches found + pass + return result + + @asyn.asyncf + async def ps(self, threads=False, **kwargs): + """Get process list using platform-appropriate command""" + if self._is_macos(): + # macOS ps with BSD-style options + command = 'ps -A -o user,pid,ppid,pcpu,rss,stat,command' + if threads: + command = 'ps -A -M -o user,pid,ppid,pcpu,rss,stat,command' + elif self._is_linux(): + # Linux ps with GNU-style options + command = 'ps -A -o user,pid,ppid,pcpu,rss,stat,command' + if threads: + command = 'ps -A -L -o user,pid,ppid,pcpu,rss,stat,command' + else: + # Fallback to simple ps + command = 'ps' + + lines = iter((await self.execute.asyn(command)).split('\n')) + next(lines) # header + result = [] + for line in lines: + if not line.strip(): + continue + parts = line.strip().split(None, 6) + if len(parts) >= 7: + entry = PsEntry( + user=parts[0], + pid=int(parts[1]), + tid=int(parts[1]), # Use PID as TID for compatibility + ppid=int(parts[2]), + vsize=0, # Not easily available in ps output + rss=int(parts[4]) if parts[4].isdigit() else 0, + wchan='', # Not available + pc='', # Not available + state=parts[5], + name=parts[6] if len(parts) > 6 else '' + ) + result.append(entry) + + # Apply filters if provided + if not kwargs: + return result + else: + filtered_result = [] + for entry in result: + if all(getattr(entry, k) == v for k, v in kwargs.items() if hasattr(entry, k)): + filtered_result.append(entry) + return filtered_result + + def _resolve_paths(self): + """Set up default paths""" + if self.working_directory is None: + self.working_directory = '/tmp/devlib-target' + + +# Keep LocalMacTarget as an alias for backward compatibility +LocalMacTarget = LocalTarget + + def _get_model_name(section): name_string = section['model name'] parts = name_string.split('@')[0].strip().split() diff --git a/tests/test_config.yml b/tests/test_config.yml index 6c5b53ba2..4634df066 100644 --- a/tests/test_config.yml +++ b/tests/test_config.yml @@ -1,5 +1,5 @@ target-configs: entry-0: - LocalLinuxTarget: + LocalTarget: connection_settings: unrooted: True diff --git a/tests/test_target.py b/tests/test_target.py index 2d811321f..fbb0d52ab 100644 --- a/tests/test_target.py +++ b/tests/test_target.py @@ -27,7 +27,7 @@ import os import pytest -from devlib import AndroidTarget, ChromeOsTarget, LinuxTarget, LocalLinuxTarget +from devlib import AndroidTarget, ChromeOsTarget, LinuxTarget, LocalLinuxTarget, LocalTarget from devlib._target_runner import NOPTargetRunner, QEMUTargetRunner from devlib.utils.android import AdbConnection from devlib.utils.misc import load_struct_from_yaml @@ -102,6 +102,11 @@ def build_target_runners(): ll_target = LocalLinuxTarget(connection_settings=target_info['connection_settings']) target_runners.append(NOPTargetRunner(ll_target)) + elif target_class is LocalTarget: + logger.info('> Local target: %s', repr(target_info)) + l_target = LocalTarget(connection_settings=target_info['connection_settings']) + target_runners.append(NOPTargetRunner(l_target)) + elif target_class is QEMUTargetRunner: logger.info('> QEMU target runner: %s', repr(target_info)) From 46ae0c03dd22cb4a953d5361c7c311fbe187b0db Mon Sep 17 00:00:00 2001 From: denini08 Date: Mon, 18 Aug 2025 22:20:05 -0300 Subject: [PATCH 2/3] ref --- devlib/__init__.py | 2 +- devlib/target.py | 281 ++++++++++++++++++++------------------------- 2 files changed, 126 insertions(+), 157 deletions(-) diff --git a/devlib/__init__.py b/devlib/__init__.py index 105e544cd..27e3b2823 100644 --- a/devlib/__init__.py +++ b/devlib/__init__.py @@ -18,7 +18,7 @@ ''' from devlib.target import ( - Target, LinuxTarget, AndroidTarget, LocalLinuxTarget, LocalTarget, + Target, LinuxTarget, AndroidTarget, LocalLinuxTarget, LocalTarget, LocalMacTarget, ChromeOsTarget, ) diff --git a/devlib/target.py b/devlib/target.py index 5b0ea9fc9..a2b536d0f 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -3138,16 +3138,13 @@ def _resolve_paths(self): self.working_directory = '/tmp/devlib-target' -class LocalTarget(Target): +class LocalTarget(LocalLinuxTarget): """ Auto-detecting local target that works on both Linux and macOS. - This class automatically detects the underlying operating system and provides - appropriate implementations for each platform, using native commands instead - of busybox when needed. + This class inherits from LocalLinuxTarget to reuse Linux implementations + and overrides only the methods that need macOS-specific behavior. """ - - path = posixpath def __init__(self, connection_settings=None, @@ -3204,9 +3201,9 @@ def _is_linux(self): @memoized def abi(self): """Get the system architecture""" - try: - arch = self.execute('uname -m').strip() - if self._is_macos(): + if self._is_macos(): + try: + arch = self.execute('uname -m').strip() # Map macOS architecture names to common names arch_map = { 'arm64': 'arm64', @@ -3214,11 +3211,11 @@ def abi(self): 'i386': 'x86', } return arch_map.get(arch, arch) - else: - # Linux architecture mapping - return ABI_MAP.get(arch, arch) - except TargetStableError: - return 'unknown' + except TargetStableError: + return 'unknown' + else: + # Use parent implementation for Linux systems + return super(LocalTarget, self).abi @property @memoized @@ -3237,19 +3234,9 @@ def os_version(self): } except TargetStableError: return {} - elif self._is_linux(): - try: - os_version = {} - if self.file_exists('/etc/os-release'): - output = self.execute('cat /etc/os-release') - for line in output.split('\n'): - if '=' in line and not line.startswith('#'): - key, value = line.split('=', 1) - os_version[key] = value.strip('"') - return os_version - except TargetStableError: - return {} - return {} + else: + # Use parent implementation for Linux and other systems + return super(LocalTarget, self).os_version @property @memoized @@ -3263,33 +3250,20 @@ def system_id(self): return f'{uuid}/{kernel}' except TargetStableError: return 'unknown' - elif self._is_linux(): - try: - kernel = self.execute('uname -r') - hardware = self.execute('ip a | grep "link/ether" | sed "s/://g" | awk \'{print $2}\' | tr -d "\\n"') - try: - filesystem = self.execute('ls /dev/disk/by-uuid | tr "\\n" "-" | sed "s/-$//"') - except TargetStableError: - filesystem = 'unknown' - return f'{hardware}/{kernel}/{filesystem}' - except TargetStableError: - return 'unknown' - return 'unknown' + else: + # Use parent implementation for Linux systems + return super(LocalTarget, self).system_id @property @memoized def cpuinfo(self): """Get CPU information, using platform-appropriate method""" - try: - # First try the standard Linux approach - return Cpuinfo(self.execute('cat /proc/cpuinfo')) - except TargetStableError: - if self._is_macos(): - # If /proc/cpuinfo doesn't exist (e.g., on macOS), use sysctl - return self._get_macos_cpuinfo() - else: - # For other systems, re-raise the error - raise + if self._is_macos(): + # On macOS, /proc/cpuinfo doesn't exist, use sysctl + return self._get_macos_cpuinfo() + else: + # Use parent implementation for Linux systems + return super(LocalTarget, self).cpuinfo def _get_macos_cpuinfo(self): """Generate /proc/cpuinfo-like output on macOS using sysctl""" @@ -3337,23 +3311,23 @@ def _get_macos_cpuinfo(self): @property def hostname(self): """Get hostname using appropriate command for the platform""" - if self._is_macos() or self._is_linux(): - # Both macOS and Linux have native hostname command + if self._is_macos(): + # macOS has native hostname command return self.execute('hostname').strip() else: - # Fallback to busybox for other systems - return self.execute('{} hostname'.format(self.busybox)).strip() + # Use parent implementation for Linux systems + return super(LocalTarget, self).hostname @property @memoized def kernel_version(self): """Get kernel version using appropriate command for the platform""" - if self._is_macos() or self._is_linux(): - # Both macOS and Linux have native uname command + if self._is_macos(): + # macOS has native uname command return KernelVersion(self.execute('uname -r -v').strip()) else: - # Fallback to busybox for other systems - return KernelVersion(self.execute('{} uname -r -v'.format(quote(self.busybox))).strip()) + # Use parent implementation for Linux systems + return super(LocalTarget, self).kernel_version @property def hostid(self): @@ -3364,21 +3338,15 @@ def hostid(self): except (TargetStableError, ValueError): # hostid might not be available on macOS, return a default return 0 - elif self._is_linux(): - try: - return int(self.execute('hostid').strip(), 16) - except (TargetStableError, ValueError): - # Fallback to busybox - return int(self.execute('{} hostid'.format(self.busybox)).strip(), 16) else: - # Fallback to busybox for other systems - return int(self.execute('{} hostid'.format(self.busybox)).strip(), 16) + # Use parent implementation for Linux systems + return super(LocalTarget, self).hostid @asyn.asyncf async def write_value(self, path, value, verify=True, as_root=True): """Write value to file using platform-appropriate command""" - if self._is_macos() or self._is_linux(): - # Both macOS and Linux have native printf + if self._is_macos(): + # macOS has native printf self.async_manager.track_access( asyn.PathAccess(namespace='target', path=path, mode='w') ) @@ -3416,14 +3384,14 @@ async def write_value(self, path, value, verify=True, as_root=True): 'Could not write "{value}" to {path}, reading "{actual}": {e.output}'.format( value=value, path=path, actual=e.output, e=e)) else: - # Fallback to parent implementation using busybox + # Use parent implementation for Linux systems await super(LocalTarget, self).write_value(path, value, verify, as_root) @asyn.asyncf async def read_tree_values_flat(self, path, depth=1, check_exit_code=True): """Read tree values using platform-appropriate commands""" - if self._is_macos() or self._is_linux(): - # Both macOS and Linux have native find and grep + if self._is_macos(): + # macOS has native find and grep self.async_manager.track_access( asyn.PathAccess(namespace='target', path=path, mode='r') ) @@ -3447,52 +3415,55 @@ async def read_tree_values_flat(self, path, depth=1, check_exit_code=True): result = {k: '\n'.join(v).strip() for k, v in accumulator.items()} return result else: - # Fallback to parent implementation using busybox + # Use parent implementation for Linux systems return await super(LocalTarget, self).read_tree_values_flat(path, depth, check_exit_code) async def _list_directory(self, path, as_root=False): """List directory contents using platform-appropriate command""" - # Both macOS and Linux have native ls command - contents = await self.execute.asyn('ls -1 {}'.format(quote(path)), as_root=as_root) - return [x.strip() for x in contents.split('\n') if x.strip()] + if self._is_macos(): + # macOS has native ls command + contents = await self.execute.asyn('ls -1 {}'.format(quote(path)), as_root=as_root) + return [x.strip() for x in contents.split('\n') if x.strip()] + else: + # Use parent implementation for Linux systems + return await super(LocalTarget, self)._list_directory(path, as_root) @asyn.asyncf async def install(self, filepath, timeout=None, with_name=None): """Install executable using platform-appropriate method""" - executable_name = with_name or os.path.basename(filepath) - on_device_file = self.path.join(self.executables_directory, executable_name) - await self.push.asyn(filepath, on_device_file, timeout=timeout) - - # Both macOS and Linux use chmod - await self.execute.asyn("chmod +x {}".format(quote(on_device_file))) - self._installed_binaries[executable_name] = on_device_file - return on_device_file + if self._is_macos(): + executable_name = with_name or os.path.basename(filepath) + on_device_file = self.path.join(self.executables_directory, executable_name) + await self.push.asyn(filepath, on_device_file, timeout=timeout) + + # macOS uses chmod + await self.execute.asyn("chmod +x {}".format(quote(on_device_file))) + self._installed_binaries[executable_name] = on_device_file + return on_device_file + else: + # Use parent implementation for Linux systems + return await super(LocalTarget, self).install(filepath, timeout, with_name) @asyn.asyncf async def uninstall(self, name): """Uninstall executable""" - on_device_executable = self.path.join(self.executables_directory, name) - await self.remove.asyn(on_device_executable) + if self._is_macos(): + on_device_executable = self.path.join(self.executables_directory, name) + await self.remove.asyn(on_device_executable) + else: + # Use parent implementation for Linux systems + await super(LocalTarget, self).uninstall(name) @asyn.asyncf async def capture_screen(self, filepath): """Capture screen using platform-appropriate method""" - timestamp = self.execute('date -u +"%Y-%m-%dT%H:%M:%SZ"').strip() - filepath = filepath.format(ts=timestamp) - if self._is_macos(): + timestamp = self.execute('date -u +"%Y-%m-%dT%H:%M:%SZ"').strip() + filepath = filepath.format(ts=timestamp) await self.execute.asyn('screencapture -x {}'.format(quote(filepath))) - elif self._is_linux(): - # Try various Linux screenshot tools - try: - await self.execute.asyn('scrot {}'.format(quote(filepath))) - except TargetStableError: - try: - await self.execute.asyn('gnome-screenshot -f {}'.format(quote(filepath))) - except TargetStableError: - raise TargetStableError('No screenshot tool available (tried scrot, gnome-screenshot)') else: - raise TargetStableError('Screen capture not supported on this platform') + # Use parent implementation for Linux systems + await super(LocalTarget, self).capture_screen(filepath) def wait_boot_complete(self, timeout=10): """On local systems, assume boot is always complete""" @@ -3501,24 +3472,21 @@ def wait_boot_complete(self, timeout=10): @asyn.asyncf async def get_pids_of(self, process_name): """Get PIDs of processes with given name""" - result = [] - try: - if self._is_macos() or self._is_linux(): - # Both have pgrep + if self._is_macos(): + result = [] + try: + # macOS has pgrep output = await self.execute.asyn('pgrep -f {}'.format(quote(process_name))) for line in output.strip().split('\n'): if line.strip(): result.append(int(line.strip())) - else: - # Fallback: parse ps output - ps_output = await self.ps.asyn() - for entry in ps_output: - if process_name in entry.name: - result.append(entry.pid) - except TargetStableError: - # pgrep returns non-zero when no matches found - pass - return result + except TargetStableError: + # pgrep returns non-zero when no matches found + pass + return result + else: + # Use parent implementation for Linux systems + return await super(LocalTarget, self).get_pids_of(process_name) @asyn.asyncf async def ps(self, threads=False, **kwargs): @@ -3528,55 +3496,56 @@ async def ps(self, threads=False, **kwargs): command = 'ps -A -o user,pid,ppid,pcpu,rss,stat,command' if threads: command = 'ps -A -M -o user,pid,ppid,pcpu,rss,stat,command' - elif self._is_linux(): - # Linux ps with GNU-style options - command = 'ps -A -o user,pid,ppid,pcpu,rss,stat,command' - if threads: - command = 'ps -A -L -o user,pid,ppid,pcpu,rss,stat,command' - else: - # Fallback to simple ps - command = 'ps' - - lines = iter((await self.execute.asyn(command)).split('\n')) - next(lines) # header - result = [] - for line in lines: - if not line.strip(): - continue - parts = line.strip().split(None, 6) - if len(parts) >= 7: - entry = PsEntry( - user=parts[0], - pid=int(parts[1]), - tid=int(parts[1]), # Use PID as TID for compatibility - ppid=int(parts[2]), - vsize=0, # Not easily available in ps output - rss=int(parts[4]) if parts[4].isdigit() else 0, - wchan='', # Not available - pc='', # Not available - state=parts[5], - name=parts[6] if len(parts) > 6 else '' - ) - result.append(entry) - - # Apply filters if provided - if not kwargs: - return result + + lines = iter((await self.execute.asyn(command)).split('\n')) + next(lines) # header + result = [] + for line in lines: + if not line.strip(): + continue + parts = line.strip().split(None, 6) + if len(parts) >= 7: + entry = PsEntry( + user=parts[0], + pid=int(parts[1]), + tid=int(parts[1]), # Use PID as TID for compatibility + ppid=int(parts[2]), + vsize=0, # Not easily available in ps output + rss=int(parts[4]) if parts[4].isdigit() else 0, + wchan='', # Not available + pc='', # Not available + state=parts[5], + name=parts[6] if len(parts) > 6 else '' + ) + result.append(entry) + + # Apply filters if provided + if not kwargs: + return result + else: + filtered_result = [] + for entry in result: + if all(getattr(entry, k) == v for k, v in kwargs.items() if hasattr(entry, k)): + filtered_result.append(entry) + return filtered_result else: - filtered_result = [] - for entry in result: - if all(getattr(entry, k) == v for k, v in kwargs.items() if hasattr(entry, k)): - filtered_result.append(entry) - return filtered_result + # Use parent implementation for Linux systems + return await super(LocalTarget, self).ps(threads, **kwargs) - def _resolve_paths(self): - """Set up default paths""" - if self.working_directory is None: - self.working_directory = '/tmp/devlib-target' +class LocalMacTarget(LocalTarget): + """ + Local macOS target. + + This class provides macOS-specific implementations while inheriting + from LocalTarget. For auto-detecting behavior, use LocalTarget instead. + """ -# Keep LocalMacTarget as an alias for backward compatibility -LocalMacTarget = LocalTarget + @property + @memoized + def os(self): + """Return 'darwin' for macOS""" + return 'darwin' def _get_model_name(section): From 029a988afbad7b605319b044743d33692db08245 Mon Sep 17 00:00:00 2001 From: denini08 Date: Thu, 21 Aug 2025 16:31:56 -0300 Subject: [PATCH 3/3] refactor: Enhance CPU info handling with structured data support --- devlib/target.py | 121 +++++++++++++++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 40 deletions(-) diff --git a/devlib/target.py b/devlib/target.py index a2b536d0f..ff6ca15ec 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -2691,7 +2691,13 @@ def charging_enabled(self, enabled): LsmodEntry = namedtuple('LsmodEntry', ['name', 'size', 'use_count', 'used_by']) -class Cpuinfo(object): +class BaseCpuinfo(object): + """ + Base class for CPU information that can be constructed from structured data. + + This class allows creating CPU info objects directly from structured sections + data rather than requiring text parsing. + """ @property @memoized @@ -2719,10 +2725,15 @@ def cpu_names(self): global_name = _get_part_name(section) return [caseless_string(c or global_name) for c in cpu_names] - def __init__(self, text): - self.sections = None + def __init__(self, sections=None): + """ + Initialize with structured sections data. + + :param sections: List of dictionaries, each representing a CPU section + :type sections: list of dict + """ + self.sections = sections or [] self.text = None - self.parse(text) @memoized def get_cpu_features(self, cpuid=0): @@ -2741,6 +2752,38 @@ def get_cpu_features(self, cpuid=0): global_features = section.get('flags').split() return global_features + def __str__(self): + return 'CpuInfo({})'.format(self.cpu_names) + + __repr__ = __str__ + + +class Cpuinfo(BaseCpuinfo): + """ + CPU information class that can parse text format or be constructed from structured data. + + This class maintains backward compatibility by parsing text in __init__ while + also supporting the structured approach via the base class. + """ + + def __init__(self, text=None, sections=None): + """ + Initialize from either text or structured sections data. + + :param text: Text to parse in /proc/cpuinfo format + :type text: str + :param sections: Pre-structured sections data + :type sections: list of dict + """ + if text is not None: + super(Cpuinfo, self).__init__() + self.text = None + self.parse(text) + elif sections is not None: + super(Cpuinfo, self).__init__(sections) + else: + raise ValueError("Either 'text' or 'sections' must be provided") + def parse(self, text): self.sections = [] current_section = {} @@ -2755,11 +2798,6 @@ def parse(self, text): current_section = {} self.sections.append(current_section) - def __str__(self): - return 'CpuInfo({})'.format(self.cpu_names) - - __repr__ = __str__ - class KernelVersion(object): """ @@ -3180,14 +3218,16 @@ def os(self): """Detect the operating system""" try: uname_output = self.execute('uname -s').strip() - if uname_output == 'Darwin': - return 'darwin' - elif uname_output == 'Linux': - return 'linux' - else: - return 'unknown' - except TargetStableError: - return 'unknown' + except Exception as e: + raise TargetStableError("Failed to detect operating system") from e + + if uname_output == "Darwin": + return "darwin" + elif uname_output == "Linux": + return "linux" + else: + raise TargetStableError(f"Unsupported operating system: {uname_output}") + def _is_macos(self): """Check if running on macOS""" @@ -3203,16 +3243,17 @@ def abi(self): """Get the system architecture""" if self._is_macos(): try: - arch = self.execute('uname -m').strip() - # Map macOS architecture names to common names - arch_map = { - 'arm64': 'arm64', - 'x86_64': 'x86_64', - 'i386': 'x86', - } - return arch_map.get(arch, arch) - except TargetStableError: - return 'unknown' + arch = self.execute("uname -m").strip() + except Exception as e: + raise TargetStableError("Failed to detect system architecture on macOS") from e + + # Map macOS architecture names to common names + arch_map = { + "arm64": "arm64", + "x86_64": "x86_64", + "i386": "x86", + } + return arch_map.get(arch, arch) else: # Use parent implementation for Linux systems return super(LocalTarget, self).abi @@ -3236,7 +3277,7 @@ def os_version(self): return {} else: # Use parent implementation for Linux and other systems - return super(LocalTarget, self).os_version + return super().os_version @property @memoized @@ -3249,7 +3290,7 @@ def system_id(self): kernel = self.execute('uname -r').strip() return f'{uuid}/{kernel}' except TargetStableError: - return 'unknown' + raise TargetStableError('Failed to get system identifier on macOS') else: # Use parent implementation for Linux systems return super(LocalTarget, self).system_id @@ -3266,7 +3307,7 @@ def cpuinfo(self): return super(LocalTarget, self).cpuinfo def _get_macos_cpuinfo(self): - """Generate /proc/cpuinfo-like output on macOS using sysctl""" + """Generate CPU information on macOS using sysctl""" # Get CPU count try: ncpu = int(self.execute('sysctl -n hw.ncpu').strip()) @@ -3292,21 +3333,21 @@ def _get_macos_cpuinfo(self): except TargetStableError: pass - # Generate /proc/cpuinfo-like format - cpuinfo_text = [] + # Create structured sections data + sections = [] for i in range(ncpu): - section = [ - f'processor\t: {i}', - f'model name\t: {brand_string}', - ] + section = { + 'processor': str(i), + 'model name': brand_string, + } if cpu_family: - section.append(f'cpu family\t: {cpu_family}') + section['cpu family'] = cpu_family if cpu_model: - section.append(f'model\t\t: {cpu_model}') + section['model'] = cpu_model - cpuinfo_text.append('\n'.join(section)) + sections.append(section) - return Cpuinfo('\n\n'.join(cpuinfo_text)) + return Cpuinfo(sections=sections) @property def hostname(self):