Source code for rawdisk.filesystems.detector

# -*- coding: utf-8 -*-

"""This module is mostly used by plugins to register filesystem
detection routines that are internaly used by rawdisk.reader.Reader
to match filesystems"""

from collections import defaultdict
import logging
import uuid
import itertools


[docs]class FilesystemDetector(object): """A class that allows to match filesystem id or guid against available plugins. """ def __init__(self, fs_plugins=None): self.logger = logging.getLogger(__name__) # 2 dimensional array of fs_id : [list of plugins] self.__mbr_plugins = defaultdict(list) # 2 dimensional array of fs_guid : [list of plugins] self.__gpt_plugins = defaultdict(list) if fs_plugins is not None: self.__register_plugins(fs_plugins=fs_plugins) def __register_plugins(self, fs_plugins): self.logger.debug('Registering filesystem plugins') for plugin in fs_plugins: gpt_identifiers = plugin.gpt_identifiers mbr_identifiers = plugin.mbr_identifiers for fs_id in mbr_identifiers: self.register_mbr_plugin(fs_id=fs_id, plugin=plugin) for fs_guid in gpt_identifiers: self.register_gpt_plugin(fs_guid=fs_guid, plugin=plugin)
[docs] def clear_plugins(self): self.__mbr_plugins.clear() self.__gpt_plugins.clear()
def __get_plugin_name(self, plugin): return type(plugin).__name__ @property def all_plugins(self): gpt_plugins = self.get_gpt_plugins() mbr_plugins = self.get_mbr_plugins() return gpt_plugins + mbr_plugins
[docs] def get_gpt_plugins(self, fs_guid=None): if fs_guid is None: return list( itertools.chain.from_iterable(self.__gpt_plugins.values())) else: return self.__gpt_plugins.get(fs_guid)
[docs] def get_mbr_plugins(self, fs_id=None): if fs_id is None: return list( itertools.chain.from_iterable(self.__mbr_plugins.values())) else: return self.__mbr_plugins.get(fs_id)
[docs] def register_mbr_plugin(self, fs_id, plugin): """Used in plugin's registration routine, to associate it's detection method with given filesystem id Args: fs_id: filesystem id that is read from MBR partition entry plugin: plugin that supports this filesystem """ self.logger.debug('MBR: {}, FS ID: {}' .format(self.__get_plugin_name(plugin), fs_id)) self.__mbr_plugins[fs_id].append(plugin)
[docs] def register_gpt_plugin(self, fs_guid, plugin): """Used in plugin's registration routine, to associate it's detection method with given filesystem guid Args: fs_guid: filesystem guid that is read from GPT partition entry plugin: plugin that supports this filesystem """ key = uuid.UUID(fs_guid.lower()) self.logger.debug('GPT: {}, GUID: {}' .format(self.__get_plugin_name(plugin), fs_guid)) self.__gpt_plugins[key].append(plugin)
[docs] def detect_standalone(self, filename, offset): for plugin in self.all_plugins: if plugin.detect(filename, offset, standalone=True): return plugin.get_volume_object()
[docs] def detect_mbr(self, filename, offset, fs_id): """Used by rawdisk.session.Session to match mbr partitions against filesystem plugins. Args: filename: device or file that it will read in order to detect the filesystem fs_id: filesystem id to match (ex. 0x07) offset: offset for the filesystem that is being matched Returns: Volume object supplied by matched plugin. If there is no match, None is returned """ self.logger.debug('Detecting MBR partition type') if fs_id not in self.__mbr_plugins: return None else: plugins = self.__mbr_plugins.get(fs_id) for plugin in plugins: if plugin.detect(filename, offset): return plugin.get_volume_object() return None
[docs] def detect_gpt(self, filename, offset, fs_guid): """Used by rawdisk.session.Session to match gpt partitions agains filesystem plugins. Args: filename: device or file that it will read in order to detect the filesystem fs_id: filesystem guid to match (ex. {EBD0A0A2-B9E5-4433-87C0-68B6B72699C7}) offset: offset for the filesystem that is being matched Returns: Volume object supplied by matched plugin. If there is no match, None is returned """ self.logger.debug('Detecting GPT partition type') if fs_guid not in self.__gpt_plugins: return None else: plugins = self.__gpt_plugins.get(fs_guid) for plugin in plugins: if plugin.detect(filename, offset): return plugin.get_volume_object() return None