admin管理员组文章数量:1417682
Objection
I am creating the simple Pipeline class for diagnose callables's process. When I apply to the class for generator functions, it cannot track the intermediate part of timedelta. This is because the generators are only processed during evaluation.
Question
How do you track the execution time for each generator?
I think that the new wrapper Generator class is needed to track each generator execution like a wrapper for the function.
Any ideas?
Prerequisite imports:
from typing import Callable, Generator, Any, Iterable
- Other imports are shown in the sample codes below.
- Python version: 3.10
Codes
1. The Pipeline
class
1-1. Construction
Firstly I created a special Iterable to fill in the gaps for zipping unbalanced pairs (for the Pipeline
class).
This is usually unnecessary, but if there are any errors or interruptions in the middle of the pipeline, the information about the callable
s may not include some of the Result
s:
class InfiniteTail(Iterable):
"""
`InfiniteTail` generates `tailed_value` instead of Iterable `obj` if raised StopIteration infinitely.
example usage:
>>> list(zip((1,3,4,5), InfiniteTail((2,3), 0)))
[(1, 2), (3, 3), (4, 0), (5, 0)]
"""
def __init__(self, obj: Iterable, tailed_value: Any = None):
self.__iter = obj.__iter__()
self.tailed_value = tailed_value
def __next__(self):
try:
value = self.__iter.__next__()
except StopIteration:
value = self.tailed_value
return value
def __iter__(self):
return self
I created a dataclass to store a Result
for each Callable
in the Pipeline
class:
from dataclasses import dataclass
@dataclass
class Result:
"""
`Result` for expressing result for each `Callable` in `Pipeline` class.
"""
args: Any = None
kwargs: Any = None
result: Any = None
done: bool = False
ex: Exception = None
I also defined a wrapper for it:
from functools import wraps
def record_result(f: Callable, results: list[Result]) -> Callable:
"""
`record_result` records a `f`'s result and store it in `results` list.
"""
@wraps(f)
def inner(*args, **kwargs):
try:
ret = f(*args, **kwargs)
results.append(Result(
args=args, kwargs=kwargs,
result=ret, done=True, ex=None))
except Exception as ex:
results.append(Result(
args=args, kwargs=kwargs,
result=None, done=False, ex=None))
raise # throw
return ret
return inner
Finally, I created the "Pipeline" class.
from functools import reduce
from IPython.lib.pretty import pretty # for pretty printing with long list/dict etc.
class Pipeline:
"""
`Pipeline` is generalized the flow of `Callable`s.
And some diagnose functionality.
- sample:
>>> def load_data(n):
>>> return list(range(n))
>>> def filtering(xs, upper=50):
>>> return list(x for x in xs if x < upper)
>>> def aggregate(xs):
>>> return sum(xs)
>>> p = Pipeline()
>>> print(p)
load_data->filtering->aggregate
>>> p(100)
1225
>>> print(p)
(100,)->load_data[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]]->filtering[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]]->aggregate[1225]
"""
@classmethod
def pretty_repr(cls, obj: Any) -> str:
return pretty(obj, max_seq_length=10)
def __init__(self, fs: list[Callable]):
self._fs = fs
self._executions = []
"""
`executions` contains results of each execution.
"""
self.__repr_mode = False
"""
False: only show func names, True: show with the last execution info,
"""
def __call__(self, stream: Any) -> Any:
results = []
try:
# reduce self._fs (callables) and get/record result and information.
ret = reduce(lambda s, f: record_result(f, results)(s), self._fs, stream)
except Exception as ex:
raise # throw
finally:
self._executions.append(results)
return ret
def _set_repr_mode(self):
"""
`_set_repr_mode` express show information or not.
"""
if len(self._executions) > 0:
self.__repr_mode = True # show information
else:
self.__repr_mode = False # do not show info, because there is no info.
def _repr_source(self) -> Iterable:
"""
source iterable for `__repr__`
"""
source = self._fs
if self.__repr_mode:
source = list(zip(source, InfiniteTail(self._executions[-1], Result())))
return source
def _element_repr(self, element: Any) -> str:
"""
element repr expression for `__repr__`
"""
if self.__repr_mode:
expr = f"{element[0].__name__}[{self.pretty_repr(element[1].result)}]"
else:
expr = element.__name__
return expr
def __repr__(self) -> str:
"""
`__repr__` returns `__repr_source` converted with `__element_repr`
joined by the delimiter (arrow sign : "->")
"""
self._set_repr_mode() # judge repr mode
exprs = [] # expression for each callable
if self.__repr_mode:
# get the last execution info
results = self._executions[-1]
# get the first result in the info if exists
first : Result = next(iter(results), None)
# if there is a result, show a first argument before the the expressions.
if first is not None:
exprs.append(self.pretty_repr(first.args)) # append the arguments expr
# append the representation of each element to exprs
for element in self._repr_source():
exprs.append(self._element_repr(element))
# return arrow-joined exprs
return "->".join(exprs)
1-2. Test the Pipeline
class
Sample code to test the Pipeline class:
def load_data(n: int) -> list:
return list(range(n))
def filtering(xs: list, upper=50) -> list:
return list(x for x in xs if x < upper)
def aggregate(xs: Iterable) -> int:
return sum(xs)
p = Pipeline([load_data, filtering, aggregate])
# test title
print(" --- test for the `Pipeline` class --- ")
# show definition
print(f" definition : {p=}")
# evaluation phase
print(f" evaluation : {p(100)=}")
# show observation from last result
print(f" observation : {p=}")
And the result:
--- test for the `Pipeline` class --- definition : p=load_data->filtering->aggregate evaluation : p(100)=1225 observation : p=(100,)->load_data[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]]->filtering[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]]->aggregate[1225]
OK, this is what I expected.
2. The MeasuredPipeline
class
2-1. Construction
Next, I made a simple timer to figure out how long each Callable
s take:
from datetime import datetime, timedelta
class Timer:
"""
simple timer class.
example:
>>> t = Timer()
>>> a = (t.start(), list(range(9_000_000)), t.stop())
>>> t.elapsed()
datetime.timedelta(microseconds=169157)
"""
_zero_delta = timedelta()
def __init__(self):
self._begin = None
self._end = None
self._delta = self._zero_delta
def start(self):
self._begin = datetime.now()
def stop(self):
self._end = datetime.now()
def elapsed(self) -> timedelta:
self._delta = self._end - self._begin \
if self._begin is not None and self._end is not None \
else self._zero_delta
return self._delta
To diagnose callables, I made a wrapper too:
from functools import wraps
def timeit_f(f: Callable, measurements: list[timedelta]) -> Callable:
"""
`timeit_f` diagnose a process time for retrieving a result from f
and append the time to the `measurements` list.
"""
t = Timer()
@wraps(f)
def inner(*args, **kwargs):
t.start()
ret = f(*args, **kwargs)
t.stop()
measurements.append(t.elapsed())
return ret
return inner
I created a special mean function for timedelta:
def timedelta_mean(iterable: Iterable[timedelta], default: timedelta = timedelta()) -> timedelta:
"""
`timedelta_mean` returns the mean of the timedelta in `iterable`.
If there is no element, it returns the `default` value instead.
"""
return sum(iterable, timedelta())/len(iterable) if len(iterable) > 0 else default
Finally, I created the MeasuredPipeline
to measure the time it took for each Callable
to complete:
class MeasuredPipeline(Pipeline):
"""
`MeasuredPipeline` is generalized the flow of `Callable`s.
And some diagnose functionality. (elapsed time added)
- sample:
>>> def load_data(n):
>>> return list(range(n))
>>> def filtering(xs, upper=1_0000):
>>> return list(x for x in xs if x < upper)
>>> def aggregate(xs):
>>> return sum(xs)
>>> p = Pipeline()
>>> print(p)
load_data->filtering->aggregate
>>> p(1_000_000)
49995000
>>> for _ in range(4):
>>> p(1_000_000)
>>> print(p)
>>> # ([ the mean of `timedelta`s ])@[ the count of executions ]
(1000000,)->load_data[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]](0:00:00.024525)@5->filtering[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]](0:00:00.027953)@5->aggregate[49995000](0:00:00)@5
"""
def __init__(self, fs: list[Callable]):
self._measurements_list = [[] for _ in fs]
fs = [timeit_f(f, l) for f, l in zip(fs, self._measurements_list)]
super().__init__(fs)
def _repr_source(self) -> Iterable:
source = super()._repr_source()
return list(zip(source, self._measurements_list))
def _element_repr(self, element: Any) -> str:
elem, l = element
expr = super()._element_repr(elem)
return f"{expr}({timedelta_mean(l)})@{len(l)}"
2-2. Test the MeasuredPipeline
class
Let's take a look at the following. Here's some sample code for the new class. This class is similar to the previous one, but it shows the elapsed average time for each caller.
from functools import partial
# wraps for keeping a callable's `__name__` attribute (partial object do not have that attrib)
mp = MeasuredPipeline([load_data, wraps(filtering)(partial(filtering, upper=1_0000)), aggregate])
print("\n\n")
# test title
print(" --- test for the `MeasuredPipeline` class --- ")
# show definition
print(f" definition : {mp=}")
# test count
test_count = 5
for i, _ in enumerate(range(test_count)):
# evaluation phase
print(f"evaluation@{i+1} : {mp(1_000_000)=}")
# show observation from last result
print(f" observation : {mp=}")
And the result:
--- test for the
MeasuredPipeline
class ---
definition : mp=load_data(0:00:00)@0->filtering(0:00:00)@0->aggregate(0:00:00)@0
evaluation@1 : mp(1_000_000)=49995000
evaluation@2 : mp(1_000_000)=49995000
evaluation@3 : mp(1_000_000)=49995000
evaluation@4 : mp(1_000_000)=49995000
evaluation@5 : mp(1_000_000)=49995000
observation : mp=(1000000,)->load_data[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]@5->filtering[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]@5
->aggregate49995000@5
Good.
2-3. Test the MeasuredPipeline
class (with generators)
Then I test the class for generator version callables:
def load_data_gen(n: int) -> Generator:
"""generator version of `load_data`"""
return range(n)
def filtering_gen(xs: Generator, upper=1_0000) -> Generator:
"""generator version of `filtering_gen`"""
return (x for x in xs if x < upper)
mp = MeasuredPipeline([load_data_gen, filtering_gen, aggregate])
print("\n\n")
# test title
print(" --- test for the `MeasuredPipeline` class (generator version) --- ")
# show definition
print(f" definition : {mp=}")
# test count
test_count = 5
for i, _ in enumerate(range(test_count)):
print(f"evaluation@{i+1} : {mp(1_000_000)=}")
# show observation from last result
print(f" observation : {mp=}")
Then the result
--- test for the
MeasuredPipeline
class (generator version) ---
definition : mp=load_data_gen(0:00:00)@0->filtering_gen(0:00:00)@0->aggregate(0:00:00)@0
evaluation@1 : mp(1_000_000)=49995000
evaluation@2 : mp(1_000_000)=49995000
evaluation@3 : mp(1_000_000)=49995000
evaluation@4 : mp(1_000_000)=49995000
evaluation@5 : mp(1_000_000)=49995000
observation : mp=(1000000,)->load_data_genrange(0, 1000000)@5->filtering_gen<generator object filtering_gen.. at 0x000001FA608DEB20>@5->aggregate49995000@5
The first two timedelta means are zeros because the generators are only processed during evaluation. Because of the aggregation phase evaluation, all processing focuses on the last part of the pipeline.
本文标签: pythonHow to diagnose timedelta in each generator in pipelineStack Overflow
版权声明:本文标题:python - How to diagnose timedelta in each generator in pipeline? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745275677a2651169.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论