Skip to content

llmcompressor.pipelines.cache

Classes:

  • IntermediateValue

    Dataclass which recursively defines offloaded values and which device to onload to

  • IntermediatesCache

    Cache which stores intermediate values (activations) produced by batched, sequential

IntermediateValue dataclass

IntermediateValue(
    value: Union[Tensor, IntermediateValue, Any],
    device: Union[device, None],
)

Dataclass which recursively defines offloaded values and which device to onload to

Parameters:

  • value

    (Union[Tensor, IntermediateValue, Any]) –

    either an offloaded Tensor, an primative value, or a recursable value

  • device

    (Union[device, None]) –

    if the value is a Tensor, then the device to onload the tensor to, otherwise None

IntermediatesCache

IntermediatesCache(
    batch_intermediates: Optional[
        List[IntermediateValues]
    ] = None,
    offload_device: Optional[device] = "cpu",
)

Cache which stores intermediate values (activations) produced by batched, sequential execution of models. Values are offloaded to the offload_device when stored in the cache and onloaded to their original device when fetched from the cache. If offload_device is None, values will not be offloaded at all.

Currently supports nested offloading of dataclass instances and tuples

Construct using empty and from_dataloader class methods

Methods:

  • append

    Append new values to the cache. The new values will be assigned the next

  • delete

    Delete values from the cache

  • empty

    Construct an empty cache

  • fetch

    Fetch values belonging to a batch

  • from_dataloader

    Initialize a cache with data from the provided dataloader

  • size

    Returns the memory used by cached values, keyed by device, in bytes

  • update

    Update/put values belonging to a batch

Source code in llmcompressor/pipelines/cache.py
def __init__(
    self,
    batch_intermediates: Optional[List[IntermediateValues]] = None,
    offload_device: Optional[torch.device] = "cpu",
):
    self.batch_intermediates = batch_intermediates or []
    self.offload_device = offload_device

append

append(values: Dict[str, Any])

Append new values to the cache. The new values will be assigned the next available batch index

Parameters:

  • values

    (Dict[str, Any]) –

    dictionary mapping keys to values used for update

Source code in llmcompressor/pipelines/cache.py
def append(self, values: Dict[str, Any]):
    """
    Append new values to the cache. The new values will be assigned the next
    available batch index

    :param values: dictionary mapping keys to values used for update
    """
    batch_index = len(self.batch_intermediates)
    self.batch_intermediates.append({})
    self.update(batch_index, values)

delete

delete(
    batch_index: int,
    consumed_names: Optional[List[str]] = None,
)

Delete values from the cache

Parameters:

  • batch_index

    (int) –

    index of batch whose values will be deleted

  • consumed_names

    (Optional[List[str]], default: None ) –

    list of keys whose values will be deleted, defaults to removing all keys

Source code in llmcompressor/pipelines/cache.py
def delete(self, batch_index: int, consumed_names: Optional[List[str]] = None):
    """
    Delete values from the cache

    :param batch_index: index of batch whose values will be deleted
    :param consumed_names: list of keys whose values will be deleted, defaults to
        removing all keys
    """
    intermediates = self.batch_intermediates[batch_index]

    if consumed_names is None:
        consumed_names = list(intermediates.keys())

    for name in consumed_names:
        del intermediates[name]

empty classmethod

empty(num_batches: int, offload_device: device)

Construct an empty cache

Parameters:

  • num_batches

    (int) –

    the expected number of batches to be stored

  • offload_device

    (device) –

    device to offload values to

Source code in llmcompressor/pipelines/cache.py
@classmethod
def empty(cls, num_batches: int, offload_device: torch.device):
    """
    Construct an empty cache

    :param num_batches: the expected number of batches to be stored
    :param offload_device: device to offload values to
    """
    batch_intermediates = [{} for _ in range(num_batches)]
    return cls(batch_intermediates, offload_device)

fetch

fetch(
    batch_index: int,
    input_names: Optional[List[str]] = None,
) -> Dict[str, Any]

Fetch values belonging to a batch

Parameters:

  • batch_index

    (int) –

    index of batch whose values are being fetched

  • input_names

    (Optional[List[str]], default: None ) –

    list of keys whose values are being fetched

Returns:

  • Dict[str, Any]

    dictionary mapping keys to onloaded values

Source code in llmcompressor/pipelines/cache.py
def fetch(
    self, batch_index: int, input_names: Optional[List[str]] = None
) -> Dict[str, Any]:
    """
    Fetch values belonging to a batch

    :param batch_index: index of batch whose values are being fetched
    :param input_names: list of keys whose values are being fetched
    :return: dictionary mapping keys to onloaded values
    """
    intermediates = self.batch_intermediates[batch_index]

    return {
        key: self._onload_value(subgraph_input)
        for key, subgraph_input in intermediates.items()
        if input_names is None or key in input_names
    }

from_dataloader classmethod

from_dataloader(
    dataloader: DataLoader,
    model_device: device = torch.device("cpu"),
    mask_padding: bool = True,
    offload_device: Optional[device] = torch.device("cpu"),
)

Initialize a cache with data from the provided dataloader

Parameters:

  • dataloader

    (DataLoader) –

    dataloader which generates values to be cached

  • model_device

    (device, default: device('cpu') ) –

    device which values will be onloaded to when fetched

  • mask_padding

    (bool, default: True ) –

    zero out padding tokens if True. This affects modifiers such as GPTQ and SparseGPT

  • offload_device

    (Optional[device], default: device('cpu') ) –

    device to offload values to

Source code in llmcompressor/pipelines/cache.py
@classmethod
def from_dataloader(
    cls,
    dataloader: torch.utils.data.DataLoader,
    model_device: torch.device = torch.device("cpu"),
    mask_padding: bool = True,
    offload_device: Optional[torch.device] = torch.device("cpu"),
):
    """
    Initialize a cache with data from the provided dataloader

    :param dataloader: dataloader which generates values to be cached
    :param model_device: device which values will be onloaded to when fetched
    :param mask_padding: zero out padding tokens if True. This affects modifiers
        such as GPTQ and SparseGPT
    :param offload_device: device to offload values to
    """
    # note: list comprehesion was found to not improve performance
    batch_intermediates = []
    for batch in tqdm(dataloader, desc="Preparing cache"):
        values = {}
        for key, value in batch.items():
            if mask_padding and (key == "input_ids") and "attention_mask" in batch:
                value = cls._mask_padding(value, batch["attention_mask"])
            values[key] = IntermediateValue(value=value, device=model_device)

        batch_intermediates.append(values)

    return cls(batch_intermediates, offload_device)

size

size() -> Dict[torch.device, int]

Returns the memory used by cached values, keyed by device, in bytes

Returns:

  • Dict[device, int]

    dictionary mapping torch device to number of bytes in cache

Source code in llmcompressor/pipelines/cache.py
def size(self) -> Dict[torch.device, int]:
    """
    Returns the memory used by cached values, keyed by device, in bytes

    :return: dictionary mapping torch device to number of bytes in cache
    """
    sizes = defaultdict(lambda: 0)

    def _size_helper(intermediate: IntermediateValue) -> int:
        value = intermediate.value

        if isinstance(value, torch.Tensor):
            sizes[value.device] += value.nbytes

        elif is_dataclass(value):
            for field in fields(value):
                _size_helper(getattr(value, field.name))

        elif isinstance(value, (tuple, list)):
            for v in value:
                _size_helper(v)

        elif isinstance(value, dict):
            for v in value.values():
                _size_helper(v)

        else:
            sizes[torch.device("cpu")] += sys.getsizeof(value, 0)

    for intermediates in self.batch_intermediates:
        for value in intermediates.values():
            _size_helper(value)

    return dict(sizes)

update

update(batch_index: int, values: Dict[str, Any])

Update/put values belonging to a batch

Parameters:

  • batch_index

    (int) –

    index of batch whose values will be updated

  • values

    (Dict[str, Any]) –

    dictionary mapping keys to values used for update

Source code in llmcompressor/pipelines/cache.py
def update(self, batch_index: int, values: Dict[str, Any]):
    """
    Update/put values belonging to a batch

    :param batch_index: index of batch whose values will be updated
    :param values: dictionary mapping keys to values used for update
    """
    intermediates = {k: self._offload_value(v) for k, v in values.items()}
    self.batch_intermediates[batch_index].update(intermediates)