本文将深入探讨地理空间计算中的性能优化技术,包括空间索引加速、并行计算策略、GPU加速以及近似算法在GIS中的应用,并提供完整的代码实现和性能对比分析。
一、高效空间索引结构
1.1 四叉树与网格索引混合结构
import numpy as np
from typing import List, Tuple, Dict
class HybridIndex:
"""四叉树与网格混合索引结构"""
def __init__(self, bounds: Tuple[float, float, float, float],
max_depth: int = 5,
max_items: int = 10,
grid_resolution: int = 100):
"""
:param bounds: (minx, miny, maxx, maxy) 空间范围
:param max_depth: 四叉树最大深度
:param max_items: 节点最大元素数量
:param grid_resolution: 网格分辨率
"""
self.bounds = bounds
self.max_depth = max_depth
self.max_items = max_items
self.grid_resolution = grid_resolution
# 计算网格大小
self.grid_cell_width = (bounds[2] - bounds[0]) / grid_resolution
self.grid_cell_height = (bounds[3] - bounds[1]) / grid_resolution
# 初始化网格索引
self.grid = [[[] for _ in range(grid_resolution)]
for _ in range(grid_resolution)]
# 初始化四叉树根节点
self.root = QuadTreeNode(
bounds[0], bounds[1],
bounds[2], bounds[3],
max_depth, max_items
)
def insert(self, item: object, x: float, y: float):
"""插入元素到索引中"""
# 同时插入到网格和四叉树
grid_x, grid_y = self._get_grid_position(x, y)
self.grid[grid_x][grid_y].append((item, x, y))
self.root.insert(item, x, y)
def query_range(self, qx1: float, qy1: float, qx2: float, qy2: float) -> List[object]:
"""范围查询"""
# 先使用网格快速筛选
min_gx, min_gy = self._get_grid_position(qx1, qy1)
max_gx, max_gy = self._get_grid_position(qx2, qy2)
candidates = set()
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
for item, x, y in self.grid[gx][gy]:
if qx1 <= x <= qx2 and qy1 <= y <= qy2:
candidates.add(item)
# 再用四叉树精确查询
quad_results = self.root.query_range(qx1, qy1, qx2, qy2)
# 合并结果并去重
return list(candidates.union(quad_results))
def _get_grid_position(self, x: float, y: float) -> Tuple[int, int]:
"""获取坐标对应的网格位置"""
gx = int((x - self.bounds[0]) // self.grid_cell_width)
gy = int((y - self.bounds[1]) // self.grid_cell_height)
return min(max(gx, 0), min(max(gy, 0), self.grid_resolution - 1)
class QuadTreeNode:
"""四叉树节点"""
def __init__(self, x1: float, y1: float, x2: float, y2: float,
max_depth: int, max_items: int, depth: int = 0):
self.x1, self.y1 = x1, y1 # 左下坐标
self.x2, self.y2 = x2, y2 # 右上坐标
self.max_depth = max_depth
self.max_items = max_items
self.depth = depth
self.items = [] # (item, x, y)
self.children = None
def insert(self, item: object, x: float, y: float):
"""插入元素到四叉树"""
# 检查是否在边界内
if not (self.x1 <= x <= self.x2 and self.y1 <= y <= self.y2):
return False
# 如果有子节点,插入到子节点
if self.children is not None:
for child in self.children:
if child.insert(item, x, y):
return True
return False
# 添加到当前节点
self.items.append((item, x, y))
# 检查是否需要分裂
if len(self.items) > self.max_items and self.depth < self.max_depth:
self._split()
return True
def _split(self):
"""分裂当前节点为4个子节点"""
mid_x = (self.x1 + self.x2) / 2
mid_y = (self.y1 + self.y2) / 2
self.children = [
QuadTreeNode(self.x1, mid_y, mid_x, self.y2,
self.max_depth, self.max_items, self.depth + 1), # NW
QuadTreeNode(mid_x, mid_y, self.x2, self.y2,
self.max_depth, self.max_items, self.depth + 1), # NE
QuadTreeNode(self.x1, self.y1, mid_x, mid_y,
self.max_depth, self.max_items, self.depth + 1), # SW
QuadTreeNode(mid_x, self.y1, self.x2, mid_y,
self.max_depth, self.max_items, self.depth + 1) # SE
]
# 将当前节点元素重新分配到子节点
for item, x, y in self.items:
for child in self.children:
if child.insert(item, x, y):
break
self.items = []
def query_range(self, qx1: float, qy1: float, qx2: float, qy2: float) -> List[object]:
"""范围查询"""
results = []
# 检查查询范围是否与当前节点相交
if not (qx1 > self.x2 or qx2 < self.x1 or qy1 > self.y2 or qy2 < self.y1):
# 如果有子节点,查询子节点
if self.children is not None:
for child in self.children:
results.extend(child.query_range(qx1, qy1, qx2, qy2))
else:
# 检查当前节点的元素
for item, x, y in self.items:
if qx1 <= x <= qx2 and qy1 <= y <= qy2:
results.append(item)
return results
# 性能对比测试
if __name__ == "__main__":
import time
import random
# 生成测试数据
bounds = (0, 0, 1000, 1000)
num_items = 100000
items = [(i, random.uniform(0, 1000), random.uniform(0, 1000))
for i in range(num_items)]
# 测试混合索引
hybrid_index = HybridIndex(bounds)
start = time.time()
for item in items:
hybrid_index.insert(*item)
build_time = time.time() - start
# 测试查询性能
query_count = 1000
query_time = 0
for _ in range(query_count):
qx1 = random.uniform(0, 900)
qy1 = random.uniform(0, 900)
qx2 = qx1 + random.uniform(10, 100)
qy2 = qy1 + random.uniform(10, 100)
start = time.time()
results = hybrid_index.query_range(qx1, qy1, qx2, qy2)
query_time += time.time() - start
print(f"混合索引构建时间: {build_time:.4f}s")
print(f"平均查询时间: {(query_time/query_count)*1000:.2f}ms")
print(f"查询结果平均数量: {len(results)/query_count:.1f}")
1.2 自适应R*树索引
import numpy as np
from typing import List, Tuple, Dict
import math
class RStarTree:
"""自适应R*树索引"""
def __init__(self, max_entries=4, min_entries=2,
reinsert_factor=0.3, split_distribution=0.4):
"""
:param max_entries: 节点最大条目数
:param min_entries: 节点最小条目数
:param reinsert_factor: 重插入比例
:param split_distribution: 分裂分布比例
"""
self.max_entries = max_entries
self.min_entries = min_entries
self.reinsert_factor = reinsert_factor
self.split_distribution = split_distribution
self.root = RStarTreeNode(is_leaf=True)
def insert(self, item, bounds):
"""插入元素"""
entry = RStarEntry(item, bounds)
self._insert(entry, self.root, 0)
def _insert(self, entry, node, depth):
"""递归插入"""
if node.is_leaf:
node.entries.append(entry)
# 检查是否需要分裂
if len(node.entries) > self.max_entries:
self._handle_overflow(node, depth)
else:
# 选择最佳子树
best_child = self._choose_subtree(node, entry)
self._insert(entry, best_child, depth + 1)
# 更新父节点边界
self._adjust_bounds(node)
def _choose_subtree(self, node, entry):
"""选择最佳子树"""
if node.is_leaf:
return node
# 计算面积增量最小的子节点
min_increase = float('inf')
best_child = None
for child in node.entries:
# 计算合并后的边界
merged = self._merge_bounds(child.bounds, entry.bounds)
# 计算面积增量
increase = self._compute_area(merged) - self._compute_area(child.bounds)
# 考虑重叠面积因素
if increase < min_increase or \
(increase == min_increase and
self._compute_area(child.bounds) < self._compute_area(best_child.bounds)):
min_increase = increase
best_child = child.child
return best_child
def _handle_overflow(self, node, depth):
"""处理节点溢出"""
# 尝试重插入
if depth != 0 and len(node.entries) > self.min_entries:
self._reinsert(node, depth)
else:
# 分裂节点
split_nodes = self._split_node(node)
if node == self.root:
# 创建新根节点
new_root = RStarTreeNode(is_leaf=False)
new_root.entries = [
RStarEntry(None, split_nodes[0].compute_bounds(), split_nodes[0]),
RStarEntry(None, split_nodes[1].compute_bounds(), split_nodes[1])
]
self.root = new_root
else:
# 替换父节点中的条目
pass
def _reinsert(self, node, depth):
"""重插入部分条目"""
# 按到中心距离排序
center = self._compute_center(node.compute_bounds())
node.entries.sort(
key=lambda e: self._compute_distance(
self._compute_center(e.bounds), center
),
reverse=True
)
# 重插入最远的条目
reinsert_count = int(math.ceil(len(node.entries) * self.reinsert_factor))
to_reinsert = node.entries[-reinsert_count:]
node.entries = node.entries[:-reinsert_count]
for entry in to_reinsert:
self._insert(entry, self.root, depth)
def _split_node(self, node):
"""分裂节点"""
# 沿最佳轴分裂
best_axis, best_distribution = self._choose_split_axis(node)
return self._distribute_entries(node, best_axis, best_distribution)
def _choose_split_axis(self, node):
"""选择最佳分裂轴"""
min_margin = float('inf')
best_axis = 0
best_distribution = 0
for axis in range(2): # 0=x, 1=y
# 按当前轴排序
node.entries.sort(key=lambda e: e.bounds[axis*2])
# 尝试不同的分布比例
for distribution in np.linspace(self.split_distribution,
1-self.split_distribution, 5):
split_pos = int(len(node.entries) * distribution)
# 计算边界和周长
group1 = node.entries[:split_pos]
group2 = node.entries[split_pos:]
bounds1 = self._compute_group_bounds(group1)
bounds2 = self._compute_group_bounds(group2)
margin = (self._compute_perimeter(bounds1) +
self._compute_perimeter(bounds2))
if margin < min_margin:
min_margin = margin
best_axis = axis
best_distribution = distribution
return best_axis, best_distribution
def _distribute_entries(self, node, axis, distribution):
"""按指定轴和分布比例分配条目"""
node.entries.sort(key=lambda e: e.bounds[axis*2])
split_pos = int(len(node.entries) * distribution)
# 创建两个新节点
node1 = RStarTreeNode(is_leaf=node.is_leaf)
node2 = RStarTreeNode(is_leaf=node.is_leaf)
node1.entries = node.entries[:split_pos]
node2.entries = node.entries[split_pos:]
return node1, node2
# 几何计算辅助方法
@staticmethod
def _merge_bounds(b1, b2):
return (
min(b1[0], b2[0]), min(b1[1], b2[1]),
max(b1[2], b2[2]), max(b1[3], b2[3])
)
@staticmethod
def _compute_area(bounds):
return (bounds[2] - bounds[0]) * (bounds[3] - bounds[1])
@staticmethod
def _compute_perimeter(bounds):
return 2 * ((bounds[2] - bounds[0]) + (bounds[3] - bounds[1]))
@staticmethod
def _compute_center(bounds):
return (
(bounds[0] + bounds[2]) / 2,
(bounds[1] + bounds[3]) / 2
)
@staticmethod
def _compute_distance(p1, p2):
return math.sqrt((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2)
@staticmethod
def _compute_group_bounds(entries):
if not entries:
return (0, 0, 0, 0)
minx = min(e.bounds[0] for e in entries)
miny = min(e.bounds[1] for e in entries)
maxx = max(e.bounds[2] for e in entries)
maxy = max(e.bounds[3] for e in entries)
return (minx, miny, maxx, maxy)
class RStarTreeNode:
"""R*树节点"""
def __init__(self, is_leaf):
self.is_leaf = is_leaf
self.entries = [] # 条目列表
def compute_bounds(self):
"""计算节点边界"""
if not self.entries:
return (0, 0, 0, 0)
if self.is_leaf:
minx = min(e.bounds[0] for e in self.entries)
miny = min(e.bounds[1] for e in self.entries)
maxx = max(e.bounds[2] for e in self.entries)
maxy = max(e.bounds[3] for e in self.entries)
else:
minx = min(e.child.compute_bounds()[0] for e in self.entries)
miny = min(e.child.compute_bounds()[1] for e in self.entries)
maxx = max(e.child.compute_bounds()[2] for e in self.entries)
maxy = max(e.child.compute_bounds()[3] for e in self.entries)
return (minx, miny, maxx, maxy)
class RStarEntry:
"""R*树条目"""
def __init__(self, item, bounds, child=None):
self.item = item # 叶子节点存储数据项
self.bounds = bounds # (minx, miny, maxx, maxy)
self.child = child # 非叶子节点存储子节点
# 使用示例
if __name__ == "__main__":
# 创建R*树索引
rtree = RStarTree(max_entries=4)
# 插入一些数据
data = [
("A", (0, 0, 10, 10)),
("B", (5, 5, 15, 15)),
("C", (10, 10, 20, 20)),
("D", (15, 15, 25, 25)),
("E", (20, 20, 30, 30))
]
for item, bounds in data:
rtree.insert(item, bounds)
# 查询示例
query_bounds = (8, 8, 18, 18)
print(f"查询范围: {query_bounds}")
# 实际应用中需要实现查询方法
# results = rtree.query(query_bounds)
# print(f"查询结果: {results}")
二、并行空间计算
2.1 基于Dask的并行空间连接
import dask.dataframe as dd
import dask_geopandas as dg
import geopandas as gpd
from shapely.geometry import Point
import numpy as np
import time
def generate_points(num_points, bounds):
"""生成随机点数据"""
x = np.random.uniform(bounds[0], bounds[2], num_points)
y = np.random.uniform(bounds[1], bounds[3], num_points)
return gpd.GeoDataFrame({
'id': range(num_points),
'geometry': [Point(xi, yi) for xi, yi in zip(x, y)]
})
def generate_polygons(num_polygons, bounds):
"""生成随机多边形数据"""
polygons = []
size = min(bounds[2]-bounds[0], bounds[3]-bounds[1]) / 10
for i in range(num_polygons):
x = np.random.uniform(bounds[0], bounds[2]-size)
y = np.random.uniform(bounds[1], bounds[3]-size)
polygons.append({
'id': i,
'geometry': box(x, y, x+size, y+size)
})
return gpd.GeoDataFrame(polygons)
def spatial_join_parallel(points_df, polygons_df, n_partitions=4):
"""并行空间连接"""
# 转换为Dask GeoDataFrame
dask_points = dg.from_geopandas(points_df, npartitions=n_partitions)
dask_polygons = dg.from_geopandas(polygons_df, npartitions=n_partitions)
# 执行空间连接
start = time.time()
joined = dask_points.sjoin(dask_polygons, how='inner', predicate='within')
result = joined.compute()
elapsed = time.time() - start
return result, elapsed
def spatial_join_sequential(points_df, polygons_df):
"""顺序空间连接"""
start = time.time()
result = gpd.sjoin(points_df, polygons_df, how='inner', predicate='within')
elapsed = time.time() - start
return result, elapsed
# 性能对比测试
if __name__ == "__main__":
# 生成测试数据
bounds = (0, 0, 1000, 1000)
points = generate_points(100000, bounds)
polygons = generate_polygons(1000, bounds)
print(f"点数据量: {len(points)}")
print(f"多边形数据量: {len(polygons)}")
# 顺序执行
_, seq_time = spatial_join_sequential(points, polygons)
print(f"顺序执行时间: {seq_time:.2f}s")
# 并行执行(4分区)
_, parallel_time = spatial_join_parallel(points, polygons, n_partitions=4)
print(f"并行执行时间(4分区): {parallel_time:.2f}s")
print(f"加速比: {seq_time/parallel_time:.2f}x")
# 并行执行(8分区)
_, parallel_time = spatial_join_parallel(points, polygons, n_partitions=8)
print(f"并行执行时间(8分区): {parallel_time:.2f}s")
print(f"加速比: {seq_time/parallel_time:.2f}x")
2.2 基于Ray的分布式空间分析
import ray
import numpy as np
from shapely.geometry import Point, Polygon
import geopandas as gpd
import time
# 初始化Ray
ray.init(num_cpus=8)
@ray.remote
class SpatialWorker:
"""空间分析工作节点"""
def __init__(self, points):
self.points = points
def count_points_in_polygon(self, polygon):
"""计算多边形内的点数量"""
count = 0
for point in self.points.geometry:
if polygon.contains(point):
count += 1
return count
def compute_density(self, polygon, area):
"""计算点密度"""
count = self.count_points_in_polygon(polygon)
return count / area if area > 0 else 0
def distributed_spatial_analysis(points, polygons, num_workers=4):
"""分布式空间分析"""
# 分割点数据
chunk_size = len(points) // num_workers
point_chunks = [
points.iloc[i*chunk_size : (i+1)*chunk_size]
for i in range(num_workers)
]
# 创建远程工作节点
workers = [SpatialWorker.remote(chunk) for chunk in point_chunks]
results = []
for _, row in polygons.iterrows():
polygon = row['geometry']
area = row['area']
# 分布式计算密度
density_futures = [
worker.compute_density.remote(polygon, area)
for worker in workers
]
densities = ray.get(density_futures)
total_density = sum(densities)
results.append({
'polygon_id': row['id'],
'density': total_density
})
return gpd.GeoDataFrame(results)
# 生成测试数据
def generate_data(num_points=100000, num_polygons=100):
bounds = (0, 0, 1000, 1000)
# 生成随机点
x = np.random.uniform(bounds[0], bounds[2], num_points)
y = np.random.uniform(bounds[1], bounds[3], num_points)
points = gpd.GeoDataFrame({
'id': range(num_points),
'geometry': [Point(xi, yi) for xi, yi in zip(x, y)]
})
# 生成随机多边形
polygons = []
size = 50
for i in range(num_polygons):
x = np.random.uniform(bounds[0], bounds[2]-size)
y = np.random.uniform(bounds[1], bounds[3]-size)
polygon = Polygon([(x, y), (x+size, y), (x+size, y+size), (x, y+size)])
polygons.append({
'id': i,
'geometry': polygon,
'area': size * size
})
return points, gpd.GeoDataFrame(polygons)
# 性能对比测试
if __name__ == "__main__":
# 生成测试数据
points, polygons = generate_data()
print(f"点数据量: {len(points)}")
print(f"多边形数据量: {len(polygons)}")
# 顺序执行
start = time.time()
results_seq = []
for _, row in polygons.iterrows():
count = 0
for point in points.geometry:
if row['geometry'].contains(point):
count += 1
density = count / row['area']
results_seq.append({
'polygon_id': row['id'],
'density': density
})
seq_time = time.time() - start
# 分布式执行(4 workers)
start = time.time()
results_dist = distributed_spatial_analysis(points, polygons, num_workers=4)
dist_time = time.time() - start
print(f"顺序执行时间: {seq_time:.2f}s")
print(f"分布式执行时间(4 workers): {dist_time:.2f}s")
print(f"加速比: {seq_time/dist_time:.2f}x")
# 验证结果
assert len(results_dist) == len(results_seq)
print("结果验证通过")
ray.shutdown()
三、GPU加速空间计算
3.1 基于Rapids的GPU空间连接
import cudf
import cuspatial
import numpy as np
import time
def generate_gpu_data(num_points=1000000, num_polygons=1000):
"""生成GPU测试数据"""
# 生成随机点
x = np.random.uniform(0, 1000, num_points)
y = np.random.uniform(0, 1000, num_points)
points = cudf.DataFrame({'x': x, 'y': y})
# 生成随机多边形(矩形)
polygon_size = 10
poly_x = np.random.uniform(0, 1000-polygon_size, num_polygons)
poly_y = np.random.uniform(0, 1000-polygon_size, num_polygons)
# 构建cuspatial需要的多边形格式
poly_offsets = cudf.Series(range(0, num_polygons*5, 5), dtype='int32')
poly_points_x = []
poly_points_y = []
for i in range(num_polygons):
poly_points_x.extend([
poly_x[i], poly_x[i]+polygon_size,
poly_x[i]+polygon_size, poly_x[i], poly_x[i]
])
poly_points_y.extend([
poly_y[i], poly_y[i],
poly_y[i]+polygon_size, poly_y[i]+polygon_size, poly_y[i]
])
poly_points_x = cudf.Series(poly_points_x)
poly_points_y = cudf.Series(poly_points_y)
return points, (poly_offsets, poly_points_x, poly_points_y)
def gpu_spatial_join(points, polygons):
"""GPU空间连接"""
# 解构多边形数据
poly_offsets, poly_points_x, poly_points_y = polygons
# 执行空间连接
result = cuspatial.point_in_polygon(
points['x'], points['y'],
poly_offsets, poly_points_x, poly_points_y
)
# 统计每个多边形内的点数量
point_counts = result.iloc[:, 1:].sum()
return point_counts
# 性能对比测试
if __name__ == "__main__":
# 生成测试数据
points, polygons = generate_gpu_data()
print(f"点数据量: {len(points)}")
print(f"多边形数据量: {len(polygons[0])}")
# GPU执行
start = time.time()
gpu_counts = gpu_spatial_join(points, polygons)
gpu_time = time.time() - start
print(f"GPU执行时间: {gpu_time:.4f}s")
print(f"每个多边形平均点数: {gpu_counts.mean():.2f}")
# CPU对比(使用geopandas)
import geopandas as gpd
from shapely.geometry import Point, Polygon
# 准备CPU数据
points_cpu = gpd.GeoDataFrame({
'geometry': [Point(x, y) for x, y in zip(
points['x'].to_array(),
points['y'].to_array()
)]
})
polygons_cpu = []
poly_offsets, poly_points_x, poly_points_y = polygons
poly_points_x = poly_points_x.to_array()
poly_points_y = poly_points_y.to_array()
for i in range(len(poly_offsets)-1):
start_idx = poly_offsets.iloc[i]
end_idx = poly_offsets.iloc[i+1] if i+1 < len(poly_offsets) else len(poly_points_x)
coords = list(zip(
poly_points_x[start_idx:end_idx],
poly_points_y[start_idx:end_idx]
))
polygons_cpu.append(Polygon(coords))
polygons_cpu = gpd.GeoDataFrame({
'geometry': polygons_cpu
})
# CPU执行
start = time.time()
cpu_counts = []
for poly in polygons_cpu.geometry:
count = 0
for point in points_cpu.geometry:
if poly.contains(point):
count += 1
cpu_counts.append(count)
cpu_time = time.time() - start
print(f"\nCPU执行时间: {cpu_time:.4f}s")
print(f"加速比: {cpu_time/gpu_time:.2f}x")
# 验证结果
gpu_counts = gpu_counts.to_array()
assert np.allclose(gpu_counts, cpu_counts)
print("结果验证通过")
3.2 基于PyTorch的GPU空间插值
import torch
import numpy as np
import time
from sklearn.metrics import mean_squared_error
class GPUIDW:
"""GPU加速的反距离加权插值"""
def __init__(self, power=2, k_neighbors=10):
self.power = power # 距离幂次
self.k = k_neighbors # 最近邻数量
def fit(self, points, values):
"""拟合模型"""
# 转换为PyTorch张量并移动到GPU
self.points = torch.tensor(points, dtype=torch.float32).cuda()
self.values = torch.tensor(values, dtype=torch.float32).cuda()
def predict(self, query_points):
"""预测查询点值"""
query_points = torch.tensor(query_points, dtype=torch.float32).cuda()
# 计算所有点对距离
dists = torch.cdist(query_points, self.points)
# 找到k个最近邻
knn_dists, knn_indices = torch.topk(dists, self.k, largest=False, dim=1)
# 计算权重
weights = 1.0 / (knn_dists ** self.power + 1e-10)
weights = weights / weights.sum(dim=1, keepdim=True)
# 加权平均
knn_values = self.values[knn_indices]
preds = (weights * knn_values).sum(dim=1)
return preds.cpu().numpy()
# 生成测试数据
def generate_data(n_samples=10000, n_query=1000):
np.random.seed(42)
points = np.random.rand(n_samples, 2) * 100
values = np.sin(points[:,0]/10) + np.cos(points[:,1]/10) + np.random.normal(0, 0.1, n_samples)
query_points = np.random.rand(n_query, 2) * 100
return points, values, query_points
# CPU实现对比
def cpu_idw(points, values, query_points, power=2, k=10):
from scipy.spatial import cKDTree
tree = cKDTree(points)
dists, indices = tree.query(query_points, k=k)
preds = []
for i in range(len(query_points)):
if np.any(dists[i] == 0):
# 查询点与样本点重合
idx = np.where(dists[i] == 0)[0][0]
pred = values[indices[i][idx]]
else:
weights = 1.0 / (dists[i] ** power)
weights /= weights.sum()
pred = np.sum(weights * values[indices[i]])
preds.append(pred)
return np.array(preds)
# 性能对比测试
if __name__ == "__main__":
# 生成数据
points, values, query_points = generate_data(n_samples=100000, n_query=10000)
# GPU加速IDW
gpu_idw = GPUIDW(power=2, k_neighbors=10)
gpu_idw.fit(points, values)
start = time.time()
gpu_preds = gpu_idw.predict(query_points)
gpu_time = time.time() - start
print(f"GPU IDW执行时间: {gpu_time:.4f}s")
# CPU IDW
start = time.time()
cpu_preds = cpu_idw(points, values, query_points)
cpu_time = time.time() - start
print(f"CPU IDW执行时间: {cpu_time:.4f}s")
print(f"加速比: {cpu_time/gpu_time:.2f}x")
# 验证结果
mse = mean_squared_error(cpu_preds, gpu_preds)
print(f"结果MSE: {mse:.6f}")
四、近似算法与空间计算优化
4.1 空间范围计数的HyperLogLog算法
import mmh3
import math
import numpy as np
from typing import Dict, Tuple
class SpatialHLL:
"""基于HyperLogLog的空间范围计数"""
def __init__(self, bounds: Tuple[float, float, float, float],
precision: int = 12, grid_resolution: int = 100):
"""
:param bounds: (minx, miny, maxx, maxy) 空间范围
:param precision: HLL精度参数(典型值10-16)
:param grid_resolution: 网格分辨率
"""
self.bounds = bounds
self.p = precision
self.m = 1 << precision # 寄存器数量
self.alpha = self._get_alpha()
# 初始化网格
self.grid_resolution = grid_resolution
self.grid_cell_width = (bounds[2] - bounds[0]) / grid_resolution
self.grid_cell_height = (bounds[3] - bounds[1]) / grid_resolution
# 每个网格单元维护一个HLL
self.grid = [[self._create_hll()
for _ in range(grid_resolution)]
for _ in range(grid_resolution)]
def _create_hll(self):
"""创建新的HLL结构"""
return [0] * self.m
def _get_alpha(self):
"""计算HLL校正因子"""
if self.p == 4:
return 0.673
elif self.p == 5:
return 0.697
elif self.p == 6:
return 0.709
else:
return 0.7213 / (1 + 1.079 / self.m)
def _get_grid_position(self, x: float, y: float) -> Tuple