Source code for winregrc.task_cache

# -*- coding: utf-8 -*-
"""Task Cache collector."""

import logging

from dfdatetime import filetime as dfdatetime_filetime
from dfdatetime import semantic_time as dfdatetime_semantic_time

from dtfabric import errors as dtfabric_errors

from winregrc import data_format
from winregrc import errors
from winregrc import interface

[docs] class CachedTask(object): """Cached task. Attributes: identifier (str): identifier. last_registered_time (dfdatetime.DateTimeValues): last registered date and time. launch_time (dfdatetime.DateTimeValues): launch date and time. name (str): name. """
[docs] def __init__(self): """Initializes a cached task.""" super(CachedTask, self).__init__() self.identifier = None self.last_registered_time = None self.launch_time = None = None
[docs] class TaskCacheDataParser(data_format.BinaryDataFormat): """Task Cache data parser.""" _DEFINITION_FILE = 'task_cache.yaml'
[docs] def __init__(self, debug=False, output_writer=None): """Initializes a Task Cache data parser. Args: debug (Optional[bool]): True if debug information should be printed. output_writer (Optional[OutputWriter]): output writer. """ super(TaskCacheDataParser, self).__init__() self._debug = debug self._output_writer = output_writer
def _ParseFiletime(self, filetime): """Parses a FILETIME timestamp value. Args: filetime (int): a FILETIME timestamp value. Returns: dfdatetime.DateTimeValues: date and time values. """ if filetime == 0: return dfdatetime_semantic_time.SemanticTime(string='Not set') if filetime == 0x7fffffffffffffff: return dfdatetime_semantic_time.SemanticTime(string='Never') return dfdatetime_filetime.Filetime(timestamp=filetime)
[docs] def ParseDynamicInfo(self, value_data, cached_task): """Parses the DynamicInfo value data. Args: value_data (bytes): DynamicInfo value data. cached_task (CachedTask): cached task. Raises: ParseError: if the value data could not be parsed. """ if self._debug: self._output_writer.WriteDebugData('DynamicInfo value data:', value_data) value_data_size = len(value_data) if value_data_size == 28: data_type_map = self._GetDataTypeMap('dynamic_info_record') elif value_data_size == 36: data_type_map = self._GetDataTypeMap('dynamic_info2_record') if not data_type_map: raise errors.ParseError( f'Unsupported value data size: {value_data_size:d}.') try: dynamic_info = data_type_map.MapByteStream(value_data) except ( dtfabric_errors.ByteStreamTooSmallError, dtfabric_errors.MappingError) as exception: raise errors.ParseError(exception) cached_task.last_registered_time = self._ParseFiletime( dynamic_info.last_registered_time) cached_task.launch_time = self._ParseFiletime( dynamic_info.launch_time) if self._debug: self._output_writer.WriteValue( 'Unknown1', f'0x{dynamic_info.unknown1:08x}') # Note this is likely either the last registered time or # the update time. self._DebugPrintFiletimeValue( 'Last registered time', dynamic_info.last_registered_time) # Note this is likely the launch time. self._DebugPrintFiletimeValue('Launch time', dynamic_info.launch_time) self._output_writer.WriteValue( 'Unknown2', f'0x{dynamic_info.unknown2:08x}') self._output_writer.WriteValue( 'Unknown3', f'0x{dynamic_info.unknown3:08x}') if dynamic_info.unknown_time is not None: self._DebugPrintFiletimeValue('Unknown time', dynamic_info.unknown_time) self._output_writer.WriteText('')
[docs] class TaskCacheCollector(interface.WindowsRegistryKeyCollector): """Task Cache collector. Attributes: cached_tasks (list[CachedTask]): cached tasks. """ _TASK_CACHE_KEY_PATH = ( 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion\\' 'Schedule\\TaskCache')
[docs] def __init__(self, debug=False, output_writer=None): """Initializes a Task Cache collector. Args: debug (Optional[bool]): True if debug information should be printed. output_writer (Optional[OutputWriter]): output writer. """ super(TaskCacheCollector, self).__init__(debug=debug) self._parser = TaskCacheDataParser(debug=debug, output_writer=output_writer) self._output_writer = output_writer self.cached_tasks = []
def _GetIdValue(self, registry_key): """Retrieves the Id value from Task Cache Tree key. Args: registry_key (dfwinreg.WinRegistryKey): Windows Registry key. Yields: tuple[dfwinreg.WinRegistryKey, dfwinreg.WinRegistryValue]: Windows Registry key and value. """ id_value = registry_key.GetValueByName('Id') if id_value: yield registry_key, id_value for subkey in registry_key.GetSubkeys(): for value_key, id_value in self._GetIdValue(subkey): yield value_key, id_value
[docs] def Collect(self, registry): # pylint: disable=arguments-differ """Collects the Task Cache. Args: registry (dfwinreg.WinRegistry): Windows Registry. Returns: bool: True if the Task Cache key was found, False if not. """ dynamic_info_size_error_reported = False task_cache_key = registry.GetKeyByPath(self._TASK_CACHE_KEY_PATH) if not task_cache_key: return False tasks_key = task_cache_key.GetSubkeyByName('Tasks') tree_key = task_cache_key.GetSubkeyByName('Tree') if not tasks_key or not tree_key: return False task_guids = {} for subkey in tree_key.GetSubkeys(): for value_key, id_value in self._GetIdValue(subkey): # TODO: improve this check to a regex. # The GUID is in the form {%GUID%} and stored an UTF-16 little-endian # string and should be 78 bytes in size. id_value_data_size = len( if id_value_data_size != 78: logging.error('Unsupported Id value data size: {0:s}.') continue guid_string = id_value.GetDataAsObject() task_guids[guid_string] = for subkey in tasks_key.GetSubkeys(): dynamic_info_value = subkey.GetValueByName('DynamicInfo') if not dynamic_info_value: continue cached_task = CachedTask() cached_task.identifier = = task_guids.get(, if self._debug: if (task_cache_key.last_written_time and task_cache_key.last_written_time.timestamp): self._output_writer.WriteFiletimeValue( 'Last written time', task_cache_key.last_written_time.timestamp) self._output_writer.WriteValue('Task', self._output_writer.WriteValue('Identifier', cached_task.identifier) self._output_writer.WriteText('') try: self._parser.ParseDynamicInfo(, cached_task) except errors.ParseError as exception: if not dynamic_info_size_error_reported: logging.error(exception) dynamic_info_size_error_reported = True continue self.cached_tasks.append(cached_task) return True