admin管理员组

文章数量:1417682

Objection

I am creating the simple Pipeline class for diagnose callables's process. When I apply to the class for generator functions, it cannot track the intermediate part of timedelta. This is because the generators are only processed during evaluation.

Question

How do you track the execution time for each generator?

I think that the new wrapper Generator class is needed to track each generator execution like a wrapper for the function.

Any ideas?

Prerequisite imports:

from typing import Callable, Generator, Any, Iterable
  • Other imports are shown in the sample codes below.
  • Python version: 3.10

Codes

1. The Pipeline class

1-1. Construction

Firstly I created a special Iterable to fill in the gaps for zipping unbalanced pairs (for the Pipeline class). This is usually unnecessary, but if there are any errors or interruptions in the middle of the pipeline, the information about the callables may not include some of the Results:

class InfiniteTail(Iterable):
    """
        `InfiniteTail` generates `tailed_value` instead of Iterable `obj` if raised StopIteration infinitely.
        
        example usage:
        
        >>> list(zip((1,3,4,5), InfiniteTail((2,3), 0)))
        [(1, 2), (3, 3), (4, 0), (5, 0)]
    """
    def __init__(self, obj: Iterable, tailed_value: Any = None):
        self.__iter = obj.__iter__()
        self.tailed_value = tailed_value
        
    def __next__(self):
        try:
            value = self.__iter.__next__()
        except StopIteration:
            value = self.tailed_value
        return value
    
    def __iter__(self):
        return self

I created a dataclass to store a Result for each Callable in the Pipeline class:

from dataclasses import dataclass
@dataclass
class Result:
    """
        `Result` for expressing result for each `Callable` in `Pipeline` class.
    """
    args: Any = None
    kwargs: Any = None
    result: Any = None
    done: bool = False
    ex: Exception = None

I also defined a wrapper for it:

from functools import wraps
def record_result(f: Callable, results: list[Result]) -> Callable:
    """
        `record_result` records a `f`'s result and store it in `results` list. 
    """
    @wraps(f)
    def inner(*args, **kwargs):
        try:
            ret = f(*args, **kwargs)
            results.append(Result(
                    args=args, kwargs=kwargs, 
                    result=ret, done=True, ex=None))
        except Exception as ex:
            results.append(Result(
                    args=args, kwargs=kwargs, 
                    result=None, done=False, ex=None))
            raise # throw
        return ret
    return inner

Finally, I created the "Pipeline" class.

from functools import reduce
from IPython.lib.pretty import pretty # for pretty printing with long list/dict etc.
class Pipeline:
    """
        `Pipeline` is generalized the flow of `Callable`s.
        And some diagnose functionality.
        
        - sample:
        >>> def load_data(n):
        >>>     return list(range(n))
        >>> def filtering(xs, upper=50):
        >>>     return list(x for x in xs if x < upper)
        >>> def aggregate(xs):
        >>>     return sum(xs)
        >>> p = Pipeline()
        >>> print(p)
        load_data->filtering->aggregate
        >>> p(100)
        1225
        >>> print(p)
        (100,)->load_data[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]]->filtering[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]]->aggregate[1225]
    """
    @classmethod
    def pretty_repr(cls, obj: Any) -> str:
        return pretty(obj, max_seq_length=10)
    
    def __init__(self, fs: list[Callable]):
        self._fs = fs
        self._executions = []
        """
            `executions` contains results of each execution.
        """
        self.__repr_mode = False
        """
            False: only show func names, True: show with the last execution info,
        """
    def __call__(self, stream: Any) -> Any:
        results = []
        try:
            # reduce self._fs (callables) and get/record result and information.
            ret = reduce(lambda s, f: record_result(f, results)(s), self._fs, stream)
        except Exception as ex:
            raise # throw
        finally:
            self._executions.append(results)
        return ret
    
    def _set_repr_mode(self):
        """
            `_set_repr_mode` express show information or not.
        """
        if len(self._executions) > 0:
            self.__repr_mode = True # show information
        else:
            self.__repr_mode = False # do not show info, because there is no info.
            
    def _repr_source(self) -> Iterable:
        """
            source iterable for `__repr__`
        """
        source = self._fs
        if self.__repr_mode:
            source = list(zip(source, InfiniteTail(self._executions[-1], Result())))
        return source
    def _element_repr(self, element: Any) -> str:
        """
            element repr expression for `__repr__`
        """
        if self.__repr_mode:
            expr = f"{element[0].__name__}[{self.pretty_repr(element[1].result)}]"
        else:
            expr = element.__name__
        return expr
    def __repr__(self) -> str:
        """
            `__repr__` returns `__repr_source` converted with `__element_repr` 
            joined by the delimiter (arrow sign : "->") 
        """
        self._set_repr_mode() # judge repr mode
        exprs = [] # expression for each callable
        if self.__repr_mode:
            # get the last execution info
            results = self._executions[-1]
            # get the first result in the info if exists
            first : Result = next(iter(results), None)
            # if there is a result, show a first argument before the the expressions. 
            if first is not None:
                exprs.append(self.pretty_repr(first.args)) # append the arguments expr
        # append the representation of each element to exprs
        for element in self._repr_source():
            exprs.append(self._element_repr(element))
        # return arrow-joined exprs
        return "->".join(exprs)

1-2. Test the Pipeline class

Sample code to test the Pipeline class:

def load_data(n: int) -> list:
    return list(range(n))
def filtering(xs: list, upper=50) -> list:
    return list(x for x in xs if x < upper)
def aggregate(xs: Iterable) -> int:
    return sum(xs)

p = Pipeline([load_data, filtering, aggregate])
# test title
print(" --- test for the `Pipeline` class --- ")
# show definition
print(f" definition  : {p=}")
# evaluation phase
print(f" evaluation  : {p(100)=}")
# show observation from last result
print(f" observation : {p=}")

And the result:

 --- test for the `Pipeline` class ---
 definition  : p=load_data->filtering->aggregate
 evaluation  : p(100)=1225
 observation : p=(100,)->load_data[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]]->filtering[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]]->aggregate[1225]

OK, this is what I expected.

2. The MeasuredPipeline class

2-1. Construction

Next, I made a simple timer to figure out how long each Callables take:

from datetime import datetime, timedelta
class Timer:
    """
        simple timer class.
        
        example:
        >>> t = Timer()
        >>> a = (t.start(), list(range(9_000_000)), t.stop()) 
        >>> t.elapsed()
        datetime.timedelta(microseconds=169157)
    """
    _zero_delta = timedelta()
    def __init__(self):
        self._begin = None
        self._end = None
        self._delta = self._zero_delta
    def start(self):
        self._begin = datetime.now()
    def stop(self):
        self._end = datetime.now()
    def elapsed(self) -> timedelta:
        self._delta = self._end - self._begin \
            if self._begin is not None and self._end is not None \
            else self._zero_delta
        return self._delta

To diagnose callables, I made a wrapper too:

from functools import wraps
def timeit_f(f: Callable, measurements: list[timedelta]) -> Callable:
    """
        `timeit_f` diagnose a process time for retrieving a result from f
        and append the time to the `measurements` list.
    """
    t = Timer()
    @wraps(f)
    def inner(*args, **kwargs):
        t.start()
        ret = f(*args, **kwargs)
        t.stop()
        measurements.append(t.elapsed())
        return ret
    return inner

I created a special mean function for timedelta:

def timedelta_mean(iterable: Iterable[timedelta], default: timedelta = timedelta()) -> timedelta:
    """
        `timedelta_mean` returns the mean of the timedelta in `iterable`.
        If there is no element, it returns the `default` value instead.
    """
    return sum(iterable, timedelta())/len(iterable) if len(iterable) > 0 else default

Finally, I created the MeasuredPipeline to measure the time it took for each Callable to complete:

class MeasuredPipeline(Pipeline):
    """
        `MeasuredPipeline` is generalized the flow of `Callable`s.
        And some diagnose functionality. (elapsed time added)
        
        - sample:
        >>> def load_data(n):
        >>>     return list(range(n))
        >>> def filtering(xs, upper=1_0000):
        >>>     return list(x for x in xs if x < upper)
        >>> def aggregate(xs):
        >>>     return sum(xs)
        >>> p = Pipeline()
        >>> print(p)
        load_data->filtering->aggregate
        >>> p(1_000_000)
        49995000
        >>> for _ in range(4):
        >>>     p(1_000_000)
        >>> print(p)
        >>> # ([ the mean of `timedelta`s ])@[ the count of executions ]
        (1000000,)->load_data[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]](0:00:00.024525)@5->filtering[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]](0:00:00.027953)@5->aggregate[49995000](0:00:00)@5
    """
    def __init__(self, fs: list[Callable]):
        self._measurements_list = [[] for _ in fs]
        fs = [timeit_f(f, l) for f, l in zip(fs, self._measurements_list)]
        super().__init__(fs)
    
    def _repr_source(self) -> Iterable:
        source = super()._repr_source()
        return list(zip(source, self._measurements_list))
    
    def _element_repr(self, element: Any) -> str:
        elem, l = element
        expr = super()._element_repr(elem)
        return f"{expr}({timedelta_mean(l)})@{len(l)}"

2-2. Test the MeasuredPipeline class

Let's take a look at the following. Here's some sample code for the new class. This class is similar to the previous one, but it shows the elapsed average time for each caller.

from functools import partial
# wraps for keeping a callable's `__name__` attribute (partial object do not have that attrib)
mp = MeasuredPipeline([load_data, wraps(filtering)(partial(filtering, upper=1_0000)), aggregate])

print("\n\n")
# test title
print(" --- test for the `MeasuredPipeline` class --- ")
# show definition
print(f" definition : {mp=}")
# test count
test_count = 5
for i, _ in enumerate(range(test_count)):
    # evaluation phase
    print(f"evaluation@{i+1} : {mp(1_000_000)=}")
# show observation from last result
print(f" observation : {mp=}")

And the result:

--- test for the MeasuredPipeline class ---
definition : mp=load_data(0:00:00)@0->filtering(0:00:00)@0->aggregate(0:00:00)@0
evaluation@1 : mp(1_000_000)=49995000
evaluation@2 : mp(1_000_000)=49995000
evaluation@3 : mp(1_000_000)=49995000
evaluation@4 : mp(1_000_000)=49995000
evaluation@5 : mp(1_000_000)=49995000
observation : mp=(1000000,)->load_data[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]@5->filtering[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]@5
->aggregate49995000@5
Good.

2-3. Test the MeasuredPipeline class (with generators)

Then I test the class for generator version callables:

def load_data_gen(n: int) -> Generator:
    """generator version of `load_data`"""
    return range(n)
def filtering_gen(xs: Generator, upper=1_0000) -> Generator:
    """generator version of `filtering_gen`"""
    return (x for x in xs if x < upper)

mp = MeasuredPipeline([load_data_gen, filtering_gen, aggregate])


print("\n\n")
# test title
print(" --- test for the `MeasuredPipeline` class (generator version) --- ")
# show definition
print(f" definition : {mp=}")
# test count
test_count = 5
for i, _ in enumerate(range(test_count)):
    print(f"evaluation@{i+1} : {mp(1_000_000)=}")
# show observation from last result
print(f" observation : {mp=}")

Then the result

--- test for the MeasuredPipeline class (generator version) ---
definition : mp=load_data_gen(0:00:00)@0->filtering_gen(0:00:00)@0->aggregate(0:00:00)@0
evaluation@1 : mp(1_000_000)=49995000
evaluation@2 : mp(1_000_000)=49995000
evaluation@3 : mp(1_000_000)=49995000
evaluation@4 : mp(1_000_000)=49995000
evaluation@5 : mp(1_000_000)=49995000
observation : mp=(1000000,)->load_data_genrange(0, 1000000)@5->filtering_gen<generator object filtering_gen.. at 0x000001FA608DEB20>@5->aggregate49995000@5

The first two timedelta means are zeros because the generators are only processed during evaluation. Because of the aggregation phase evaluation, all processing focuses on the last part of the pipeline.

本文标签: pythonHow to diagnose timedelta in each generator in pipelineStack Overflow