0
点赞
收藏
分享

微信扫一扫

Python入门(7)--高级函数特性详解

老牛走世界 2024-11-21 阅读 8

Python高级函数特性详解 🚀

目录

  1. 匿名函数(Lambda)
  2. 装饰器的使用
  3. 生成器与迭代器
  4. 递归函数应用
  5. 实战案例:文件批处理工具

1. 匿名函数(Lambda)深入解析 🎯

1.1 Lambda函数基础与进阶

1.1.1 基本语法与类型注解
from typing import Callable, List, Any, Dict, Optional, Union

# 基础lambda函数(带类型注解)
square: Callable[[int], int] = lambda x: x ** 2
greeting: Callable[[str, Optional[str]], str] = lambda name, msg="你好": f"{msg}, {name}!"

# 带类型注解的复杂lambda函数
process_data: Callable[[Dict[str, Any]], List[Any]] = lambda d: [v for k, v in d.items() if isinstance(v, (int, float))]
1.1.2 函数式编程应用
# 1. 组合多个lambda函数
compose = lambda f, g: lambda x: f(g(x))
double = lambda x: x * 2
increment = lambda x: x + 1

# 先加1再乘2
double_after_increment = compose(double, increment)
print(double_after_increment(3))  # 输出: 8

# 2. 柯里化示例
curry = lambda f: lambda x: lambda y: f(x, y)
add = lambda x, y: x + y
curried_add = curry(add)
increment = curried_add(1)
print(increment(5))  # 输出: 6

# 3. 偏函数应用
from functools import partial
multiply = lambda x, y: x * y
double = partial(multiply, 2)
print(double(4))  # 输出: 8

1.2 Lambda函数实战应用

1.2.1 数据处理与转换
# 1. 复杂数据结构处理
users = [
    {"name": "张三", "age": 25, "salary": 8000, "department": "技术"},
    {"name": "李四", "age": 30, "salary": 12000, "department": "销售"},
    {"name": "王五", "age": 28, "salary": 15000, "department": "技术"},
    {"name": "赵六", "age": 35, "salary": 20000, "department": "管理"}
]

# 按多个条件排序(先按部门,再按薪资降序)
sorted_users = sorted(
    users,
    key=lambda u: (u["department"], -u["salary"])
)

# 数据转换和筛选
tech_salaries = list(map(
    lambda u: u["salary"],
    filter(lambda u: u["department"] == "技术", users)
))

# 2. 数据聚合
from collections import defaultdict

# 按部门统计平均薪资
dept_avg_salary = defaultdict(list)
for user in users:
    dept_avg_salary[user["department"]].append(user["salary"])

dept_averages = {
    dept: sum(salaries) / len(salaries)
    for dept, salaries in dept_avg_salary.items()
}
1.2.2 事件处理与回调
class EventSystem:
    def __init__(self):
        self.handlers: Dict[str, List[Callable]] = defaultdict(list)

    def register(self, event_name: str, handler: Callable) -> None:
        self.handlers[event_name].append(handler)

    def trigger(self, event_name: str, *args, **kwargs) -> None:
        for handler in self.handlers[event_name]:
            handler(*args, **kwargs)

# 使用lambda作为事件处理器
event_system = EventSystem()

# 注册多个简单的事件处理器
event_system.register("user_login", lambda user: print(f"用户 {user} 登录"))
event_system.register("user_login", lambda user: print(f"发送欢迎邮件给 {user}"))

# 注册带条件判断的处理器
event_system.register(
    "payment",
    lambda amount, user: print(f"大额支付警告: {user} 支付了 {amount}元")
    if amount > 10000 else None
)

1.3 Lambda函数最佳实践与性能考虑

1.3.1 性能优化技巧
from timeit import timeit
import operator

# 1. 使用operator模块替代简单lambda
numbers = list(range(1000))

# 不推荐
lambda_time = timeit(lambda: list(map(lambda x: x + 1, numbers)), number=1000)

# 推荐
operator_time = timeit(lambda: list(map(operator.add, numbers, [1]*len(numbers))), number=1000)

# 2. 缓存计算结果
from functools import lru_cache

# 使用缓存装饰器包装lambda
cached_computation = lru_cache(maxsize=128)(lambda x: sum(i * i for i in range(x)))
1.3.2 代码可维护性建议
# ✅ 适合使用Lambda的场景
# 1. 简单的键函数
sorted_items = sorted(items, key=lambda x: x.priority)

# 2. 简单的数据转换
transformed = map(lambda x: x.upper(), items)

# 3. 简单的过滤条件
filtered = filter(lambda x: x > 0, numbers)

# ❌ 不建议使用Lambda的场景
# 1. 复杂的业务逻辑
# 不推荐
process = lambda data: {
    k: [i * 2 for i in v if i > 0]
    for k, v in data.items()
    if isinstance(v, list)
}

# ✅ 推荐使用常规函数
def process_data(data: Dict[str, List[int]]) -> Dict[str, List[int]]:
    """处理数据集合
    
    Args:
        data: 输入数据字典
        
    Returns:
        处理后的数据字典
    """
    result = {}
    for key, values in data.items():
        if isinstance(values, list):
            result[key] = [i * 2 for i in values if i > 0]
    return result

1.4 实战示例:数据分析管道

from typing import List, Dict, Any
from datetime import datetime

class DataPipeline:
    """数据处理管道"""
    
    def __init__(self):
        self.transforms: List[Callable] = []
    
    def add_transform(self, transform: Callable) -> 'DataPipeline':
        """添加转换步骤"""
        self.transforms.append(transform)
        return self
    
    def process(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """执行所有转换"""
        result = data
        for transform in self.transforms:
            result = transform(result)
        return result

# 使用示例
# 1. 创建转换函数
filter_active = lambda data: [
    item for item in data
    if item.get('status') == 'active'
]

calculate_metrics = lambda data: [
    {**item, 'efficiency': item['output'] / item['input'] if item['input'] else 0}
    for item in data
]

add_timestamp = lambda data: [
    {**item, 'processed_at': datetime.now().isoformat()}
    for item in data
]

# 2. 构建和使用管道
pipeline = DataPipeline()
pipeline.add_transform(filter_active)\
       .add_transform(calculate_metrics)\
       .add_transform(add_timestamp)

# 3. 处理数据
sample_data = [
    {'id': 1, 'status': 'active', 'input': 100, 'output': 85},
    {'id': 2, 'status': 'inactive', 'input': 90, 'output': 70},
    {'id': 3, 'status': 'active', 'input': 95, 'output': 80}
]

processed_data = pipeline.process(sample_data)

2. 装饰器的使用 🎨

2.1 基本装饰器

from functools import wraps
import time
from typing import Callable, TypeVar, Any

T = TypeVar('T', bound=Callable[..., Any])

def timing_decorator(func: T) -> T:
    """测量函数执行时间的装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"函数 {func.__name__} 执行时间: {end_time - start_time:.4f} 秒")
        return result
    return wrapper

@timing_decorator
def slow_function():
    time.sleep(1)
    return "完成"

2.2 带参数的装饰器

def retry(max_attempts: int = 3, delay: float = 1.0):
    """创建一个重试装饰器
    
    Args:
        max_attempts: 最大重试次数
        delay: 重试间隔(秒)
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempts += 1
                    if attempts == max_attempts:
                        raise e
                    time.sleep(delay)
            return None
        return wrapper
    return decorator

@retry(max_attempts=3, delay=2.0)
def unstable_network_call():
    """模拟不稳定的网络调用"""
    if random.random() < 0.7:  # 70%的失败率
        raise ConnectionError("网络连接失败")
    return "成功"

2.3 多个装饰器的组合

def log_decorator(func: Callable) -> Callable:
    @wraps(func)
    def wrapper(*args, **kwargs):
        print(f"调用函数: {func.__name__}")
        return func(*args, **kwargs)
    return wrapper

def validate_inputs(func: Callable) -> Callable:
    @wraps(func)
    def wrapper(*args, **kwargs):
        if not args and not kwargs:
            raise ValueError("函数参数不能为空")
        return func(*args, **kwargs)
    return wrapper

@log_decorator
@validate_inputs
@timing_decorator
def process_data(data: List[Any]) -> List[Any]:
    """处理数据的函数"""
    return [item for item in data if item is not None]

3. 生成器与迭代器 🔄

3.1 生成器函数

from typing import Generator, Iterator, List

def fibonacci_generator(n: int) -> Generator[int, None, None]:
    """生成斐波那契数列的生成器
    
    Args:
        n: 要生成的数字个数
        
    Yields:
        斐波那契数列中的下一个数字
    """
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b

# 内存效率对比
def get_numbers_list(n: int) -> List[int]:
    """返回列表的方式"""
    return [i ** 2 for i in range(n)]

def get_numbers_generator(n: int) -> Generator[int, None, None]:
    """生成器方式"""
    for i in range(n):
        yield i ** 2

3.2 生成器表达式

# 列表推导式 vs 生成器表达式
numbers = [1, 2, 3, 4, 5]

# 列表推导式(立即计算)
squares_list = [x ** 2 for x in numbers]  # 创建新列表

# 生成器表达式(惰性计算)
squares_gen = (x ** 2 for x in numbers)   # 创建生成器对象

# 条件筛选
even_squares = (x ** 2 for x in numbers if x % 2 == 0)

# 多重生成器
matrix = ((i, j) for i in range(3) for j in range(3))

3.3 自定义迭代器

class DataIterator:
    """自定义数据迭代器"""
    
    def __init__(self, data: List[Any]):
        self.data = data
        self.index = 0
    
    def __iter__(self) -> 'DataIterator':
        return self
    
    def __next__(self) -> Any:
        if self.index >= len(self.data):
            raise StopIteration
        value = self.data[self.index]
        self.index += 1
        return value

# 使用示例
class DataContainer:
    def __init__(self, data: List[Any]):
        self.data = data
    
    def __iter__(self) -> DataIterator:
        return DataIterator(self.data)

4. 递归函数应用 🌳

4.1 基本递归模式

from typing import Any, List, Optional

def factorial(n: int) -> int:
    """计算阶乘的递归实现"""
    if n <= 1:  # 基本情况
        return 1
    return n * factorial(n - 1)  # 递归情况

def binary_search(arr: List[int], target: int, left: int = 0, right: Optional[int] = None) -> int:
    """递归实现二分查找
    
    Returns:
        目标值的索引,如果未找到返回-1
    """
    if right is None:
        right = len(arr) - 1
    
    if left > right:
        return -1
    
    mid = (left + right) // 2
    if arr[mid] == target:
        return mid
    elif arr[mid] > target:
        return binary_search(arr, target, left, mid - 1)
    else:
        return binary_search(arr, target, mid + 1, right)

4.2 高级递归应用

class TreeNode:
    def __init__(self, value: Any):
        self.value = value
        self.left: Optional[TreeNode] = None
        self.right: Optional[TreeNode] = None

def tree_traversal(root: Optional[TreeNode], method: str = "inorder") -> Generator[Any, None, None]:
    """通用的树遍历生成器
    
    Args:
        root: 树的根节点
        method: 遍历方法 ("preorder", "inorder", "postorder")
    """
    if not root:
        return
    
    if method == "preorder":
        yield root.value
        yield from tree_traversal(root.left, method)
        yield from tree_traversal(root.right, method)
    
    elif method == "inorder":
        yield from tree_traversal(root.left, method)
        yield root.value
        yield from tree_traversal(root.right, method)
    
    elif method == "postorder":
        yield from tree_traversal(root.left, method)
        yield from tree_traversal(root.right, method)
        yield root.value

5. 实战案例:文件批处理工具 🛠️

5.1 文件处理工具类

from pathlib import Path
from typing import Generator, List, Dict, Callable
import os
import shutil

class FileBatchProcessor:
    """文件批处理工具"""
    
    def __init__(self, source_dir: str):
        self.source_dir = Path(source_dir)
        self.processors: Dict[str, Callable] = {}
    
    def register_processor(self, extension: str, processor: Callable) -> None:
        """注册文件处理器"""
        self.processors[extension] = processor
    
    def find_files(self, pattern: str = "*") -> Generator[Path, None, None]:
        """递归查找文件"""
        for item in self.source_dir.rglob(pattern):
            if item.is_file():
                yield item
    
    def process_files(self) -> Dict[str, List[str]]:
        """处理所有文件"""
        results = {"success": [], "error": []}
        
        for file_path in self.find_files():
            try:
                extension = file_path.suffix.lower()
                if extension in self.processors:
                    self.processors[extension](file_path)
                    results["success"].append(str(file_path))
            except Exception as e:
                results["error"].append(f"{file_path}: {str(e)}")
        
        return results

5.2 实际应用示例

# 文件处理器函数
def process_text_file(file_path: Path) -> None:
    """处理文本文件"""
    with file_path.open('r', encoding='utf-8') as f:
        content = f.read()
    
    # 处理文本内容
    processed_content = content.upper()
    
    # 保存处理后的文件
    backup_path = file_path.with_suffix(file_path.suffix + '.bak')
    shutil.copy2(file_path, backup_path)
    
    with file_path.open('w', encoding='utf-8') as f:
        f.write(processed_content)

def process_image_file(file_path: Path) -> None:
    """处理图片文件(示例)"""
    # 这里可以添加图片处理逻辑
    pass

# 使用示例
def main():
    # 创建处理器实例
    processor = FileBatchProcessor("./documents")
    
    # 注册文件处理器
    processor.register_processor(".txt", process_text_file)
    processor.register_processor(".md", process_text_file)
    processor.register_processor(".jpg", process_image_file)
    processor.register_processor(".png", process_image_file)
    
    # 执行批处理
    results = processor.process_files()
    
    # 输出处理结果
    print(f"成功处理的文件: {len(results['success'])}")
    print(f"处理失败的文件: {len(results['error'])}")
    
    if results['error']:
        print("\n错误详情:")
        for error in results['error']:
            print(f"- {error}")

if __name__ == "__main__":
    main()

本文详细介绍了Python的高级函数特性,从匿名函数到装饰器,从生成器到递归函数,最后通过一个实战案例展示了这些特性的实际应用。

希望这些内容对你的Python编程之旅有所帮助!

如果你觉得这篇文章有帮助,欢迎点赞转发,也期待在评论区看到你的想法和建议!👇

咱们下一期见!

举报

相关推荐

0 条评论