Source code for heimdall.cli.isf_manager

import base64
import json
import logging
import lzma
import os
import tempfile
from pathlib import Path
from typing import Optional, Tuple

import requests
from libvmi import Libvmi, LibvmiError

from heimdall.exceptions import SymbolsFileNotFoundError, SymbolsFileRemoteNotFoundError
from heimdall.utils.utils import download_file

logger = logging.getLogger(__name__)
MAC_AND_LINUX_REPO = 'https://raw.githubusercontent.com/Abyss-W4tcher/volatility3-symbols/master/banners/banners.json'
CHUNK_SIZE = 0x200000
MACOS_BANNER_SIGNATURE = b'Darwin Kernel Version'
LINUX_BANNER_SIGNATURE = b'Linux version'


[docs] class ISFManager: """Manager class for handling ISF profiles."""
[docs] def __init__(self, profiles_directory: Path) -> None: """ Initialize ISFManager with the provided profile's directory. Parameters ---------- profiles_directory : Path The directory where ISF profiles are stored. """ self.profiles_directory = profiles_directory
[docs] def list(self) -> dict[str, Path]: """ List all available ISF profiles. Returns ------- dict of str : Path A dictionary mapping profile names to their file paths. Raises ------ SymbolsFileNotFoundError If the profiles directory does not exist. """ profiles = {} if not self.profiles_directory.exists(): raise SymbolsFileNotFoundError() for profile_file in self.profiles_directory.iterdir(): profiles[profile_file.stem] = profile_file return profiles
[docs] def delete(self, profile_name: str) -> None: """ Delete a specific ISF profile. Parameters ---------- profile_name : str The name of the profile to delete. Raises ------ FileExistsError If the specified profile does not exist. """ profile_path = Path(self.profiles_directory / profile_name).with_suffix('.json') if not profile_path.exists(): raise FileExistsError() profile_path.unlink() logger.info(f'Deleted: {profile_name}')
[docs] def purge(self) -> None: """ Purge all ISF profiles. Deletes all profiles found in the profiles directory. """ for profile_name, profile_path in self.list().items(): self.delete(profile_name) logger.info(f'Deleted all profiles in: {self.profiles_directory}')
[docs] @staticmethod def detect(vm_name: str, kvmi_socket: dict) -> Optional[Tuple[str, str]]: """ Detect the kernel signature of a VM. Parameters ---------- vm_name : str The name of the virtual machine. kvmi_socket : dict KVMI socket data for communication with the VM. Returns ------- tuple of (str, str) or None A tuple containing the decoded banner and its base64-encoded version, or None if detection fails. """ logger.info(f'Scanning for kernel signature: {MACOS_BANNER_SIGNATURE} | {LINUX_BANNER_SIGNATURE}') with Libvmi(vm_name, init_data=kvmi_socket, partial=True) as vmi: max_physical_address = vmi.get_max_physical_address() for address in range(0, max_physical_address, CHUNK_SIZE): if banner := ISFManager._read_banner(vmi, address): return banner.decode('utf-8'), base64.b64encode(banner).decode() logger.error('Unable to determine OS') return None
@staticmethod def _read_banner(vmi: Libvmi, address: int) -> Optional[bytes]: """ Read the kernel banner from memory. Parameters ---------- vmi : Libvmi The Libvmi instance for interacting with VM memory. address : int The memory address to start reading from. Returns ------- bytes or None The kernel banner as bytes, or None if the banner cannot be read. """ try: data = vmi.read_pa(address, CHUNK_SIZE)[0] if MACOS_BANNER_SIGNATURE in data: banner = data[data.index(MACOS_BANNER_SIGNATURE):].split(b'\x00', 1)[0] + b'\x00\n' return banner elif LINUX_BANNER_SIGNATURE in data: banner = data[data.index(LINUX_BANNER_SIGNATURE):].split(b'\n', 1)[0] + b'\x00\n' return banner except LibvmiError: pass return None @staticmethod def _extract(source: str, dest: str) -> None: """ Extract JSON data from a xz compressed file. Parameters ---------- source : str Path to the xz compressed source file. dest : str Path to the destination file for extracted JSON data. """ with lzma.open(source) as xz: data = json.load(xz) with open(dest, 'w') as dest_file: json.dump(data, dest_file)
[docs] def download(self, banner: str, outfile: str) -> None: """ Download and extract an ISF profile. Parameters ---------- banner : str Base64-encoded OS banner to identify the ISF file. outfile : str Name for the downloaded ISF file. Raises ------ SymbolsFileRemoteNotFoundError If the banner is not found in the repository. """ banners = requests.get(MAC_AND_LINUX_REPO).json() if banner in banners['mac']: download_url = banners['mac'][banner][0] elif banner in banners['linux']: download_url = banners['linux'][banner][0] else: raise SymbolsFileRemoteNotFoundError(banner) filename = os.path.basename(download_url) outfile_path = self.profiles_directory / f'{outfile}.json' with tempfile.TemporaryDirectory() as tmpdir: tmpdir_path = Path(tmpdir) xz_file = tmpdir_path / filename self.profiles_directory.mkdir(exist_ok=True, parents=True) download_file(download_url, xz_file) logger.info('Extracting symbols file (may take a while)...') self._extract(xz_file, outfile_path) logger.info(f'Symbols file is now available {outfile_path}')
[docs] def create(self, vm_name: str, kvmi_socket: dict) -> None: """ Create an ISF profile for a specified VM. Parameters ---------- vm_name : str The name of the virtual machine. kvmi_socket : dict KVMI socket data for communication with the VM. """ result = self.detect(vm_name, kvmi_socket) if result: banner, banner_b64 = result self.download(banner_b64, vm_name) else: logger.error('Profile creation failed due to detection failure.')