0
点赞
收藏
分享

微信扫一扫

地理空间计算优化与高性能算法

本文将深入探讨地理空间计算中的性能优化技术,包括空间索引加速、并行计算策略、GPU加速以及近似算法在GIS中的应用,并提供完整的代码实现和性能对比分析。

一、高效空间索引结构

1.1 四叉树与网格索引混合结构

import numpy as np
from typing import List, Tuple, Dict

class HybridIndex:
    """四叉树与网格混合索引结构"""
    def __init__(self, bounds: Tuple[float, float, float, float], 
                 max_depth: int = 5, 
                 max_items: int = 10,
                 grid_resolution: int = 100):
        """
        :param bounds: (minx, miny, maxx, maxy) 空间范围
        :param max_depth: 四叉树最大深度
        :param max_items: 节点最大元素数量
        :param grid_resolution: 网格分辨率
        """
        self.bounds = bounds
        self.max_depth = max_depth
        self.max_items = max_items
        self.grid_resolution = grid_resolution
        
        # 计算网格大小
        self.grid_cell_width = (bounds[2] - bounds[0]) / grid_resolution
        self.grid_cell_height = (bounds[3] - bounds[1]) / grid_resolution
        
        # 初始化网格索引
        self.grid = [[[] for _ in range(grid_resolution)] 
                    for _ in range(grid_resolution)]
        
        # 初始化四叉树根节点
        self.root = QuadTreeNode(
            bounds[0], bounds[1], 
            bounds[2], bounds[3],
            max_depth, max_items
        )
    
    def insert(self, item: object, x: float, y: float):
        """插入元素到索引中"""
        # 同时插入到网格和四叉树
        grid_x, grid_y = self._get_grid_position(x, y)
        self.grid[grid_x][grid_y].append((item, x, y))
        
        self.root.insert(item, x, y)
    
    def query_range(self, qx1: float, qy1: float, qx2: float, qy2: float) -> List[object]:
        """范围查询"""
        # 先使用网格快速筛选
        min_gx, min_gy = self._get_grid_position(qx1, qy1)
        max_gx, max_gy = self._get_grid_position(qx2, qy2)
        
        candidates = set()
        for gx in range(min_gx, max_gx + 1):
            for gy in range(min_gy, max_gy + 1):
                for item, x, y in self.grid[gx][gy]:
                    if qx1 <= x <= qx2 and qy1 <= y <= qy2:
                        candidates.add(item)
        
        # 再用四叉树精确查询
        quad_results = self.root.query_range(qx1, qy1, qx2, qy2)
        
        # 合并结果并去重
        return list(candidates.union(quad_results))
    
    def _get_grid_position(self, x: float, y: float) -> Tuple[int, int]:
        """获取坐标对应的网格位置"""
        gx = int((x - self.bounds[0]) // self.grid_cell_width)
        gy = int((y - self.bounds[1]) // self.grid_cell_height)
        return min(max(gx, 0), min(max(gy, 0), self.grid_resolution - 1)

class QuadTreeNode:
    """四叉树节点"""
    def __init__(self, x1: float, y1: float, x2: float, y2: float,
                 max_depth: int, max_items: int, depth: int = 0):
        self.x1, self.y1 = x1, y1  # 左下坐标
        self.x2, self.y2 = x2, y2  # 右上坐标
        self.max_depth = max_depth
        self.max_items = max_items
        self.depth = depth
        self.items = []  # (item, x, y)
        self.children = None
    
    def insert(self, item: object, x: float, y: float):
        """插入元素到四叉树"""
        # 检查是否在边界内
        if not (self.x1 <= x <= self.x2 and self.y1 <= y <= self.y2):
            return False
        
        # 如果有子节点,插入到子节点
        if self.children is not None:
            for child in self.children:
                if child.insert(item, x, y):
                    return True
            return False
        
        # 添加到当前节点
        self.items.append((item, x, y))
        
        # 检查是否需要分裂
        if len(self.items) > self.max_items and self.depth < self.max_depth:
            self._split()
            
        return True
    
    def _split(self):
        """分裂当前节点为4个子节点"""
        mid_x = (self.x1 + self.x2) / 2
        mid_y = (self.y1 + self.y2) / 2
        
        self.children = [
            QuadTreeNode(self.x1, mid_y, mid_x, self.y2, 
                        self.max_depth, self.max_items, self.depth + 1),  # NW
            QuadTreeNode(mid_x, mid_y, self.x2, self.y2, 
                        self.max_depth, self.max_items, self.depth + 1),  # NE
            QuadTreeNode(self.x1, self.y1, mid_x, mid_y, 
                        self.max_depth, self.max_items, self.depth + 1),  # SW
            QuadTreeNode(mid_x, self.y1, self.x2, mid_y, 
                        self.max_depth, self.max_items, self.depth + 1)   # SE
        ]
        
        # 将当前节点元素重新分配到子节点
        for item, x, y in self.items:
            for child in self.children:
                if child.insert(item, x, y):
                    break
        
        self.items = []
    
    def query_range(self, qx1: float, qy1: float, qx2: float, qy2: float) -> List[object]:
        """范围查询"""
        results = []
        
        # 检查查询范围是否与当前节点相交
        if not (qx1 > self.x2 or qx2 < self.x1 or qy1 > self.y2 or qy2 < self.y1):
            # 如果有子节点,查询子节点
            if self.children is not None:
                for child in self.children:
                    results.extend(child.query_range(qx1, qy1, qx2, qy2))
            else:
                # 检查当前节点的元素
                for item, x, y in self.items:
                    if qx1 <= x <= qx2 and qy1 <= y <= qy2:
                        results.append(item)
        
        return results

# 性能对比测试
if __name__ == "__main__":
    import time
    import random
    
    # 生成测试数据
    bounds = (0, 0, 1000, 1000)
    num_items = 100000
    items = [(i, random.uniform(0, 1000), random.uniform(0, 1000)) 
             for i in range(num_items)]
    
    # 测试混合索引
    hybrid_index = HybridIndex(bounds)
    start = time.time()
    for item in items:
        hybrid_index.insert(*item)
    build_time = time.time() - start
    
    # 测试查询性能
    query_count = 1000
    query_time = 0
    for _ in range(query_count):
        qx1 = random.uniform(0, 900)
        qy1 = random.uniform(0, 900)
        qx2 = qx1 + random.uniform(10, 100)
        qy2 = qy1 + random.uniform(10, 100)
        
        start = time.time()
        results = hybrid_index.query_range(qx1, qy1, qx2, qy2)
        query_time += time.time() - start
    
    print(f"混合索引构建时间: {build_time:.4f}s")
    print(f"平均查询时间: {(query_time/query_count)*1000:.2f}ms")
    print(f"查询结果平均数量: {len(results)/query_count:.1f}")

1.2 自适应R*树索引

import numpy as np
from typing import List, Tuple, Dict
import math

class RStarTree:
    """自适应R*树索引"""
    def __init__(self, max_entries=4, min_entries=2, 
                 reinsert_factor=0.3, split_distribution=0.4):
        """
        :param max_entries: 节点最大条目数
        :param min_entries: 节点最小条目数
        :param reinsert_factor: 重插入比例
        :param split_distribution: 分裂分布比例
        """
        self.max_entries = max_entries
        self.min_entries = min_entries
        self.reinsert_factor = reinsert_factor
        self.split_distribution = split_distribution
        self.root = RStarTreeNode(is_leaf=True)
    
    def insert(self, item, bounds):
        """插入元素"""
        entry = RStarEntry(item, bounds)
        self._insert(entry, self.root, 0)
    
    def _insert(self, entry, node, depth):
        """递归插入"""
        if node.is_leaf:
            node.entries.append(entry)
            
            # 检查是否需要分裂
            if len(node.entries) > self.max_entries:
                self._handle_overflow(node, depth)
        else:
            # 选择最佳子树
            best_child = self._choose_subtree(node, entry)
            self._insert(entry, best_child, depth + 1)
            
            # 更新父节点边界
            self._adjust_bounds(node)
    
    def _choose_subtree(self, node, entry):
        """选择最佳子树"""
        if node.is_leaf:
            return node
        
        # 计算面积增量最小的子节点
        min_increase = float('inf')
        best_child = None
        
        for child in node.entries:
            # 计算合并后的边界
            merged = self._merge_bounds(child.bounds, entry.bounds)
            
            # 计算面积增量
            increase = self._compute_area(merged) - self._compute_area(child.bounds)
            
            # 考虑重叠面积因素
            if increase < min_increase or \
               (increase == min_increase and 
                self._compute_area(child.bounds) < self._compute_area(best_child.bounds)):
                min_increase = increase
                best_child = child.child
        
        return best_child
    
    def _handle_overflow(self, node, depth):
        """处理节点溢出"""
        # 尝试重插入
        if depth != 0 and len(node.entries) > self.min_entries:
            self._reinsert(node, depth)
        else:
            # 分裂节点
            split_nodes = self._split_node(node)
            
            if node == self.root:
                # 创建新根节点
                new_root = RStarTreeNode(is_leaf=False)
                new_root.entries = [
                    RStarEntry(None, split_nodes[0].compute_bounds(), split_nodes[0]),
                    RStarEntry(None, split_nodes[1].compute_bounds(), split_nodes[1])
                ]
                self.root = new_root
            else:
                # 替换父节点中的条目
                pass
    
    def _reinsert(self, node, depth):
        """重插入部分条目"""
        # 按到中心距离排序
        center = self._compute_center(node.compute_bounds())
        node.entries.sort(
            key=lambda e: self._compute_distance(
                self._compute_center(e.bounds), center
            ),
            reverse=True
        )
        
        # 重插入最远的条目
        reinsert_count = int(math.ceil(len(node.entries) * self.reinsert_factor))
        to_reinsert = node.entries[-reinsert_count:]
        node.entries = node.entries[:-reinsert_count]
        
        for entry in to_reinsert:
            self._insert(entry, self.root, depth)
    
    def _split_node(self, node):
        """分裂节点"""
        # 沿最佳轴分裂
        best_axis, best_distribution = self._choose_split_axis(node)
        return self._distribute_entries(node, best_axis, best_distribution)
    
    def _choose_split_axis(self, node):
        """选择最佳分裂轴"""
        min_margin = float('inf')
        best_axis = 0
        best_distribution = 0
        
        for axis in range(2):  # 0=x, 1=y
            # 按当前轴排序
            node.entries.sort(key=lambda e: e.bounds[axis*2])
            
            # 尝试不同的分布比例
            for distribution in np.linspace(self.split_distribution, 
                                          1-self.split_distribution, 5):
                split_pos = int(len(node.entries) * distribution)
                
                # 计算边界和周长
                group1 = node.entries[:split_pos]
                group2 = node.entries[split_pos:]
                
                bounds1 = self._compute_group_bounds(group1)
                bounds2 = self._compute_group_bounds(group2)
                
                margin = (self._compute_perimeter(bounds1) + 
                         self._compute_perimeter(bounds2))
                
                if margin < min_margin:
                    min_margin = margin
                    best_axis = axis
                    best_distribution = distribution
        
        return best_axis, best_distribution
    
    def _distribute_entries(self, node, axis, distribution):
        """按指定轴和分布比例分配条目"""
        node.entries.sort(key=lambda e: e.bounds[axis*2])
        split_pos = int(len(node.entries) * distribution)
        
        # 创建两个新节点
        node1 = RStarTreeNode(is_leaf=node.is_leaf)
        node2 = RStarTreeNode(is_leaf=node.is_leaf)
        
        node1.entries = node.entries[:split_pos]
        node2.entries = node.entries[split_pos:]
        
        return node1, node2
    
    # 几何计算辅助方法
    @staticmethod
    def _merge_bounds(b1, b2):
        return (
            min(b1[0], b2[0]), min(b1[1], b2[1]),
            max(b1[2], b2[2]), max(b1[3], b2[3])
        )
    
    @staticmethod
    def _compute_area(bounds):
        return (bounds[2] - bounds[0]) * (bounds[3] - bounds[1])
    
    @staticmethod
    def _compute_perimeter(bounds):
        return 2 * ((bounds[2] - bounds[0]) + (bounds[3] - bounds[1]))
    
    @staticmethod
    def _compute_center(bounds):
        return (
            (bounds[0] + bounds[2]) / 2,
            (bounds[1] + bounds[3]) / 2
        )
    
    @staticmethod
    def _compute_distance(p1, p2):
        return math.sqrt((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2)
    
    @staticmethod
    def _compute_group_bounds(entries):
        if not entries:
            return (0, 0, 0, 0)
        
        minx = min(e.bounds[0] for e in entries)
        miny = min(e.bounds[1] for e in entries)
        maxx = max(e.bounds[2] for e in entries)
        maxy = max(e.bounds[3] for e in entries)
        
        return (minx, miny, maxx, maxy)

class RStarTreeNode:
    """R*树节点"""
    def __init__(self, is_leaf):
        self.is_leaf = is_leaf
        self.entries = []  # 条目列表
    
    def compute_bounds(self):
        """计算节点边界"""
        if not self.entries:
            return (0, 0, 0, 0)
        
        if self.is_leaf:
            minx = min(e.bounds[0] for e in self.entries)
            miny = min(e.bounds[1] for e in self.entries)
            maxx = max(e.bounds[2] for e in self.entries)
            maxy = max(e.bounds[3] for e in self.entries)
        else:
            minx = min(e.child.compute_bounds()[0] for e in self.entries)
            miny = min(e.child.compute_bounds()[1] for e in self.entries)
            maxx = max(e.child.compute_bounds()[2] for e in self.entries)
            maxy = max(e.child.compute_bounds()[3] for e in self.entries)
        
        return (minx, miny, maxx, maxy)

class RStarEntry:
    """R*树条目"""
    def __init__(self, item, bounds, child=None):
        self.item = item    # 叶子节点存储数据项
        self.bounds = bounds  # (minx, miny, maxx, maxy)
        self.child = child   # 非叶子节点存储子节点

# 使用示例
if __name__ == "__main__":
    # 创建R*树索引
    rtree = RStarTree(max_entries=4)
    
    # 插入一些数据
    data = [
        ("A", (0, 0, 10, 10)),
        ("B", (5, 5, 15, 15)),
        ("C", (10, 10, 20, 20)),
        ("D", (15, 15, 25, 25)),
        ("E", (20, 20, 30, 30))
    ]
    
    for item, bounds in data:
        rtree.insert(item, bounds)
    
    # 查询示例
    query_bounds = (8, 8, 18, 18)
    print(f"查询范围: {query_bounds}")
    
    # 实际应用中需要实现查询方法
    # results = rtree.query(query_bounds)
    # print(f"查询结果: {results}")

二、并行空间计算

2.1 基于Dask的并行空间连接

import dask.dataframe as dd
import dask_geopandas as dg
import geopandas as gpd
from shapely.geometry import Point
import numpy as np
import time

def generate_points(num_points, bounds):
    """生成随机点数据"""
    x = np.random.uniform(bounds[0], bounds[2], num_points)
    y = np.random.uniform(bounds[1], bounds[3], num_points)
    return gpd.GeoDataFrame({
        'id': range(num_points),
        'geometry': [Point(xi, yi) for xi, yi in zip(x, y)]
    })

def generate_polygons(num_polygons, bounds):
    """生成随机多边形数据"""
    polygons = []
    size = min(bounds[2]-bounds[0], bounds[3]-bounds[1]) / 10
    
    for i in range(num_polygons):
        x = np.random.uniform(bounds[0], bounds[2]-size)
        y = np.random.uniform(bounds[1], bounds[3]-size)
        polygons.append({
            'id': i,
            'geometry': box(x, y, x+size, y+size)
        })
    
    return gpd.GeoDataFrame(polygons)

def spatial_join_parallel(points_df, polygons_df, n_partitions=4):
    """并行空间连接"""
    # 转换为Dask GeoDataFrame
    dask_points = dg.from_geopandas(points_df, npartitions=n_partitions)
    dask_polygons = dg.from_geopandas(polygons_df, npartitions=n_partitions)
    
    # 执行空间连接
    start = time.time()
    joined = dask_points.sjoin(dask_polygons, how='inner', predicate='within')
    result = joined.compute()
    elapsed = time.time() - start
    
    return result, elapsed

def spatial_join_sequential(points_df, polygons_df):
    """顺序空间连接"""
    start = time.time()
    result = gpd.sjoin(points_df, polygons_df, how='inner', predicate='within')
    elapsed = time.time() - start
    return result, elapsed

# 性能对比测试
if __name__ == "__main__":
    # 生成测试数据
    bounds = (0, 0, 1000, 1000)
    points = generate_points(100000, bounds)
    polygons = generate_polygons(1000, bounds)
    
    print(f"点数据量: {len(points)}")
    print(f"多边形数据量: {len(polygons)}")
    
    # 顺序执行
    _, seq_time = spatial_join_sequential(points, polygons)
    print(f"顺序执行时间: {seq_time:.2f}s")
    
    # 并行执行(4分区)
    _, parallel_time = spatial_join_parallel(points, polygons, n_partitions=4)
    print(f"并行执行时间(4分区): {parallel_time:.2f}s")
    print(f"加速比: {seq_time/parallel_time:.2f}x")
    
    # 并行执行(8分区)
    _, parallel_time = spatial_join_parallel(points, polygons, n_partitions=8)
    print(f"并行执行时间(8分区): {parallel_time:.2f}s")
    print(f"加速比: {seq_time/parallel_time:.2f}x")

2.2 基于Ray的分布式空间分析

import ray
import numpy as np
from shapely.geometry import Point, Polygon
import geopandas as gpd
import time

# 初始化Ray
ray.init(num_cpus=8)

@ray.remote
class SpatialWorker:
    """空间分析工作节点"""
    def __init__(self, points):
        self.points = points
    
    def count_points_in_polygon(self, polygon):
        """计算多边形内的点数量"""
        count = 0
        for point in self.points.geometry:
            if polygon.contains(point):
                count += 1
        return count
    
    def compute_density(self, polygon, area):
        """计算点密度"""
        count = self.count_points_in_polygon(polygon)
        return count / area if area > 0 else 0

def distributed_spatial_analysis(points, polygons, num_workers=4):
    """分布式空间分析"""
    # 分割点数据
    chunk_size = len(points) // num_workers
    point_chunks = [
        points.iloc[i*chunk_size : (i+1)*chunk_size] 
        for i in range(num_workers)
    ]
    
    # 创建远程工作节点
    workers = [SpatialWorker.remote(chunk) for chunk in point_chunks]
    
    results = []
    for _, row in polygons.iterrows():
        polygon = row['geometry']
        area = row['area']
        
        # 分布式计算密度
        density_futures = [
            worker.compute_density.remote(polygon, area) 
            for worker in workers
        ]
        densities = ray.get(density_futures)
        total_density = sum(densities)
        
        results.append({
            'polygon_id': row['id'],
            'density': total_density
        })
    
    return gpd.GeoDataFrame(results)

# 生成测试数据
def generate_data(num_points=100000, num_polygons=100):
    bounds = (0, 0, 1000, 1000)
    
    # 生成随机点
    x = np.random.uniform(bounds[0], bounds[2], num_points)
    y = np.random.uniform(bounds[1], bounds[3], num_points)
    points = gpd.GeoDataFrame({
        'id': range(num_points),
        'geometry': [Point(xi, yi) for xi, yi in zip(x, y)]
    })
    
    # 生成随机多边形
    polygons = []
    size = 50
    for i in range(num_polygons):
        x = np.random.uniform(bounds[0], bounds[2]-size)
        y = np.random.uniform(bounds[1], bounds[3]-size)
        polygon = Polygon([(x, y), (x+size, y), (x+size, y+size), (x, y+size)])
        polygons.append({
            'id': i,
            'geometry': polygon,
            'area': size * size
        })
    
    return points, gpd.GeoDataFrame(polygons)

# 性能对比测试
if __name__ == "__main__":
    # 生成测试数据
    points, polygons = generate_data()
    
    print(f"点数据量: {len(points)}")
    print(f"多边形数据量: {len(polygons)}")
    
    # 顺序执行
    start = time.time()
    results_seq = []
    for _, row in polygons.iterrows():
        count = 0
        for point in points.geometry:
            if row['geometry'].contains(point):
                count += 1
        density = count / row['area']
        results_seq.append({
            'polygon_id': row['id'],
            'density': density
        })
    seq_time = time.time() - start
    
    # 分布式执行(4 workers)
    start = time.time()
    results_dist = distributed_spatial_analysis(points, polygons, num_workers=4)
    dist_time = time.time() - start
    
    print(f"顺序执行时间: {seq_time:.2f}s")
    print(f"分布式执行时间(4 workers): {dist_time:.2f}s")
    print(f"加速比: {seq_time/dist_time:.2f}x")
    
    # 验证结果
    assert len(results_dist) == len(results_seq)
    print("结果验证通过")
    
    ray.shutdown()

三、GPU加速空间计算

3.1 基于Rapids的GPU空间连接

import cudf
import cuspatial
import numpy as np
import time

def generate_gpu_data(num_points=1000000, num_polygons=1000):
    """生成GPU测试数据"""
    # 生成随机点
    x = np.random.uniform(0, 1000, num_points)
    y = np.random.uniform(0, 1000, num_points)
    points = cudf.DataFrame({'x': x, 'y': y})
    
    # 生成随机多边形(矩形)
    polygon_size = 10
    poly_x = np.random.uniform(0, 1000-polygon_size, num_polygons)
    poly_y = np.random.uniform(0, 1000-polygon_size, num_polygons)
    
    # 构建cuspatial需要的多边形格式
    poly_offsets = cudf.Series(range(0, num_polygons*5, 5), dtype='int32')
    
    poly_points_x = []
    poly_points_y = []
    for i in range(num_polygons):
        poly_points_x.extend([
            poly_x[i], poly_x[i]+polygon_size, 
            poly_x[i]+polygon_size, poly_x[i], poly_x[i]
        ])
        poly_points_y.extend([
            poly_y[i], poly_y[i], 
            poly_y[i]+polygon_size, poly_y[i]+polygon_size, poly_y[i]
        ])
    
    poly_points_x = cudf.Series(poly_points_x)
    poly_points_y = cudf.Series(poly_points_y)
    
    return points, (poly_offsets, poly_points_x, poly_points_y)

def gpu_spatial_join(points, polygons):
    """GPU空间连接"""
    # 解构多边形数据
    poly_offsets, poly_points_x, poly_points_y = polygons
    
    # 执行空间连接
    result = cuspatial.point_in_polygon(
        points['x'], points['y'],
        poly_offsets, poly_points_x, poly_points_y
    )
    
    # 统计每个多边形内的点数量
    point_counts = result.iloc[:, 1:].sum()
    return point_counts

# 性能对比测试
if __name__ == "__main__":
    # 生成测试数据
    points, polygons = generate_gpu_data()
    
    print(f"点数据量: {len(points)}")
    print(f"多边形数据量: {len(polygons[0])}")
    
    # GPU执行
    start = time.time()
    gpu_counts = gpu_spatial_join(points, polygons)
    gpu_time = time.time() - start
    
    print(f"GPU执行时间: {gpu_time:.4f}s")
    print(f"每个多边形平均点数: {gpu_counts.mean():.2f}")
    
    # CPU对比(使用geopandas)
    import geopandas as gpd
    from shapely.geometry import Point, Polygon
    
    # 准备CPU数据
    points_cpu = gpd.GeoDataFrame({
        'geometry': [Point(x, y) for x, y in zip(
            points['x'].to_array(), 
            points['y'].to_array()
        )]
    })
    
    polygons_cpu = []
    poly_offsets, poly_points_x, poly_points_y = polygons
    poly_points_x = poly_points_x.to_array()
    poly_points_y = poly_points_y.to_array()
    
    for i in range(len(poly_offsets)-1):
        start_idx = poly_offsets.iloc[i]
        end_idx = poly_offsets.iloc[i+1] if i+1 < len(poly_offsets) else len(poly_points_x)
        
        coords = list(zip(
            poly_points_x[start_idx:end_idx],
            poly_points_y[start_idx:end_idx]
        ))
        polygons_cpu.append(Polygon(coords))
    
    polygons_cpu = gpd.GeoDataFrame({
        'geometry': polygons_cpu
    })
    
    # CPU执行
    start = time.time()
    cpu_counts = []
    for poly in polygons_cpu.geometry:
        count = 0
        for point in points_cpu.geometry:
            if poly.contains(point):
                count += 1
        cpu_counts.append(count)
    cpu_time = time.time() - start
    
    print(f"\nCPU执行时间: {cpu_time:.4f}s")
    print(f"加速比: {cpu_time/gpu_time:.2f}x")
    
    # 验证结果
    gpu_counts = gpu_counts.to_array()
    assert np.allclose(gpu_counts, cpu_counts)
    print("结果验证通过")

3.2 基于PyTorch的GPU空间插值

import torch
import numpy as np
import time
from sklearn.metrics import mean_squared_error

class GPUIDW:
    """GPU加速的反距离加权插值"""
    def __init__(self, power=2, k_neighbors=10):
        self.power = power  # 距离幂次
        self.k = k_neighbors  # 最近邻数量
    
    def fit(self, points, values):
        """拟合模型"""
        # 转换为PyTorch张量并移动到GPU
        self.points = torch.tensor(points, dtype=torch.float32).cuda()
        self.values = torch.tensor(values, dtype=torch.float32).cuda()
        
    def predict(self, query_points):
        """预测查询点值"""
        query_points = torch.tensor(query_points, dtype=torch.float32).cuda()
        
        # 计算所有点对距离
        dists = torch.cdist(query_points, self.points)
        
        # 找到k个最近邻
        knn_dists, knn_indices = torch.topk(dists, self.k, largest=False, dim=1)
        
        # 计算权重
        weights = 1.0 / (knn_dists ** self.power + 1e-10)
        weights = weights / weights.sum(dim=1, keepdim=True)
        
        # 加权平均
        knn_values = self.values[knn_indices]
        preds = (weights * knn_values).sum(dim=1)
        
        return preds.cpu().numpy()

# 生成测试数据
def generate_data(n_samples=10000, n_query=1000):
    np.random.seed(42)
    points = np.random.rand(n_samples, 2) * 100
    values = np.sin(points[:,0]/10) + np.cos(points[:,1]/10) + np.random.normal(0, 0.1, n_samples)
    query_points = np.random.rand(n_query, 2) * 100
    return points, values, query_points

# CPU实现对比
def cpu_idw(points, values, query_points, power=2, k=10):
    from scipy.spatial import cKDTree
    
    tree = cKDTree(points)
    dists, indices = tree.query(query_points, k=k)
    
    preds = []
    for i in range(len(query_points)):
        if np.any(dists[i] == 0):
            # 查询点与样本点重合
            idx = np.where(dists[i] == 0)[0][0]
            pred = values[indices[i][idx]]
        else:
            weights = 1.0 / (dists[i] ** power)
            weights /= weights.sum()
            pred = np.sum(weights * values[indices[i]])
        preds.append(pred)
    
    return np.array(preds)

# 性能对比测试
if __name__ == "__main__":
    # 生成数据
    points, values, query_points = generate_data(n_samples=100000, n_query=10000)
    
    # GPU加速IDW
    gpu_idw = GPUIDW(power=2, k_neighbors=10)
    gpu_idw.fit(points, values)
    
    start = time.time()
    gpu_preds = gpu_idw.predict(query_points)
    gpu_time = time.time() - start
    print(f"GPU IDW执行时间: {gpu_time:.4f}s")
    
    # CPU IDW
    start = time.time()
    cpu_preds = cpu_idw(points, values, query_points)
    cpu_time = time.time() - start
    print(f"CPU IDW执行时间: {cpu_time:.4f}s")
    print(f"加速比: {cpu_time/gpu_time:.2f}x")
    
    # 验证结果
    mse = mean_squared_error(cpu_preds, gpu_preds)
    print(f"结果MSE: {mse:.6f}")

四、近似算法与空间计算优化

4.1 空间范围计数的HyperLogLog算法

import mmh3
import math
import numpy as np
from typing import Dict, Tuple

class SpatialHLL:
    """基于HyperLogLog的空间范围计数"""
    def __init__(self, bounds: Tuple[float, float, float, float], 
                 precision: int = 12, grid_resolution: int = 100):
        """
        :param bounds: (minx, miny, maxx, maxy) 空间范围
        :param precision: HLL精度参数(典型值10-16)
        :param grid_resolution: 网格分辨率
        """
        self.bounds = bounds
        self.p = precision
        self.m = 1 << precision  # 寄存器数量
        self.alpha = self._get_alpha()
        
        # 初始化网格
        self.grid_resolution = grid_resolution
        self.grid_cell_width = (bounds[2] - bounds[0]) / grid_resolution
        self.grid_cell_height = (bounds[3] - bounds[1]) / grid_resolution
        
        # 每个网格单元维护一个HLL
        self.grid = [[self._create_hll() 
                     for _ in range(grid_resolution)] 
                    for _ in range(grid_resolution)]
    
    def _create_hll(self):
        """创建新的HLL结构"""
        return [0] * self.m
    
    def _get_alpha(self):
        """计算HLL校正因子"""
        if self.p == 4:
            return 0.673
        elif self.p == 5:
            return 0.697
        elif self.p == 6:
            return 0.709
        else:
            return 0.7213 / (1 + 1.079 / self.m)
    
    def _get_grid_position(self, x: float, y: float) -> Tuple

举报

相关推荐

0 条评论