Source code for heimdall.os_related.linux.processes

import logging
from functools import cached_property

from libvmi import LibvmiError

from heimdall.heimdall_client import HeimdallClient
from heimdall.os_related.base_os.processes import Process, Processes
from heimdall.os_related.linux.filesystem import LinuxFileSystem

logger = logging.getLogger(__name__)


[docs] class LinuxProcess(Process): """ Represents a Linux process, providing access to process attributes such as name, PID, and path. """ @cached_property def name(self) -> str: """ The name of the process. Returns ------- str The name of the process. """ return self.ks.comm[0] @cached_property def pid(self) -> int: """ The process ID (PID). Returns ------- int The process ID. """ return self.ks.pid[0] @cached_property def path(self) -> str: """ The file path associated with the process executable. Returns ------- str The full path of the executable file, or the process name if path retrieval fails. """ try: return LinuxFileSystem.dentry_full_path(self.ks.mm[0].exe_file[0].f_path.dentry[0]) except LibvmiError: return self.name
[docs] class LinuxProcesses(Processes): """ Manages Linux processes, allowing listing of all processes in the system. """
[docs] def __init__(self, heimdall_client: HeimdallClient): """ Initialize the LinuxProcesses manager with a Heimdall client. Parameters ---------- heimdall_client : HeimdallClient The client used for interacting with the system kernel. """ super().__init__(heimdall_client) init_task = self.hc.kernel.symbols_jar.get_symbol('init_task') self.proc_list = self.hc.kernel.file_symbol(init_task.address, init_task.kind)
[docs] def list(self) -> list[LinuxProcess]: """ List all processes. Returns ------- list of LinuxProcess A list of all Linux processes. """ processes = [] task_offset = self.hc.kernel.symbols_jar.get_type('task_struct').fields['tasks'].offset current_entry = self.proc_list.tasks.next[0] while True: current_proc = self.hc.kernel.symbol(current_entry.address - task_offset).cast('task_struct') processes.append(LinuxProcess(ks=current_proc, ctx=self.hc.create_ctx(current_proc.pid[0]))) current_entry = current_entry.next[0] if current_entry == self.proc_list.tasks.next[0]: break return processes