一、元数据管理技术实现
1.1 元数据采集引擎(Python实现)
import requests
from bs4 import BeautifulSoup
import pandas as pd
from sqlalchemy import create_engine
import xml.etree.ElementTree as ET
class MetadataCollector:
def __init__(self, config):
self.db_engine = create_engine(config['db_uri'])
self.metadata_store = {}
def extract_rdbms_metadata(self, schema):
"""关系型数据库元数据采集"""
query = f"""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = '{schema}'
"""
df = pd.read_sql(query, self.db_engine)
self._store_metadata('rdbms', df.to_dict('records'))
def extract_api_metadata(self, api_spec):
"""API接口元数据采集"""
if api_spec.endswith('.json'):
with open(api_spec) as f:
data = json.load(f)
endpoints = data['paths']
elif api_spec.endswith('.xml'):
tree = ET.parse(api_spec)
root = tree.getroot()
endpoints = {elem.tag: elem.attrib for elem in root}
self._store_metadata('api', endpoints)
def _store_metadata(self, source_type, metadata):
"""元数据存储方法"""
if source_type not in self.metadata_store:
self.metadata_store[source_type] = []
self.metadata_store[source_type].extend(metadata)
def generate_data_catalog(self):
"""生成数据目录"""
catalog = {
'tables': [],
'apis': []
}
for item in self.metadata_store.get('rdbms', []):
catalog['tables'].append({
'name': item['table_name'],
'columns': [{
'name': item['column_name'],
'type': item['data_type']
}]
})
for api, spec in self.metadata_store.get('api', {}).items():
catalog['apis'].append({
'endpoint': api,
'parameters': spec.get('parameters', [])
})
return catalog
1.2 元数据血缘分析(NetworkX实现)
import networkx as nx
import matplotlib.pyplot as plt
class LineageAnalyzer:
def __init__(self):
self.graph = nx.DiGraph()
def add_data_flow(self, source, target, transformation):
"""添加数据流转关系"""
self.graph.add_edge(source, target,
label=transformation,
arrowsize=20)
def visualize_lineage(self, output_file=None):
"""可视化血缘关系"""
pos = nx.spring_layout(self.graph)
plt.figure(figsize=(12, 8))
nx.draw_networkx_nodes(self.graph, pos, node_size=2000)
nx.draw_networkx_edges(self.graph, pos,
arrowstyle='->',
edge_color='gray')
nx.draw_networkx_labels(self.graph, pos)
edge_labels = nx.get_edge_attributes(self.graph, 'label')
nx.draw_networkx_edge_labels(self.graph, pos,
edge_labels=edge_labels)
if output_file:
plt.savefig(output_file)
else:
plt.show()
def trace_impact(self, start_node):
"""影响分析"""
return nx.descendants(self.graph, start_node)
def trace_origin(self, end_node):
"""溯源分析"""
return nx.ancestors(self.graph, end_node)
二、数据质量检查实现
2.1 数据质量规则引擎
from pydantic import BaseModel, validator
import numpy as np
from datetime import datetime
class DataQualityRule(BaseModel):
rule_name: str
rule_type: str # completeness, validity, consistency, etc.
severity: str # critical, warning, info
@validator('rule_type')
def validate_rule_type(cls, v):
valid_types = ['completeness', 'validity',
'consistency', 'uniqueness']
if v not in valid_types:
raise ValueError(f"Invalid rule type. Must be one of {valid_types}")
return v
class CompletenessRule(DataQualityRule):
column: str
threshold: float = 0.95
def evaluate(self, df):
"""完整性检查"""
null_count = df[self.column].isnull().sum()
total = len(df)
completeness = 1 - (null_count / total)
return {
'passed': completeness >= self.threshold,
'score': completeness,
'message': f"Completeness {completeness:.2%} (threshold: {self.threshold:.0%})"
}
class ValidityRule(DataQualityRule):
column: str
valid_values: list = None
min_value: float = None
max_value: float = None
def evaluate(self, df):
"""有效性检查"""
if self.valid_values:
invalid = ~df[self.column].isin(self.valid_values)
elif self.min_value is not None or self.max_value is not None:
invalid = False
if self.min_value is not None:
invalid |= (df[self.column] < self.min_value)
if self.max_value is not None:
invalid |= (df[self.column] > self.max_value)
invalid_count = invalid.sum()
validity = 1 - (invalid_count / len(df))
return {
'passed': validity >= 0.99, # 默认99%有效性阈值
'score': validity,
'message': f"Validity {validity:.2%} (invalid: {invalid_count})"
}
class DataQualityEngine:
def __init__(self, rules):
self.rules = rules
def run_checks(self, df):
"""执行质量检查"""
results = []
for rule in self.rules:
try:
result = rule.evaluate(df)
results.append({
'rule': rule.rule_name,
'type': rule.rule_type,
'severity': rule.severity,
**result
})
except Exception as e:
results.append({
'rule': rule.rule_name,
'error': str(e),
'passed': False
})
return results
def generate_report(self, results):
"""生成质量报告"""
passed = sum(1 for r in results if r.get('passed', False))
total = len(results)
score = passed / total if total > 0 else 1.0
report = {
'summary': {
'score': score,
'passed': passed,
'failed': total - passed,
'critical_issues': sum(
1 for r in results
if not r.get('passed', False) and r.get('severity') == 'critical'
)
},
'details': results
}
return report
三、数据安全与脱敏实现
3.1 动态数据脱敏(Python实现)
import re
from functools import partial
from typing import Callable, Any
import hashlib
class DataMasker:
@staticmethod
def mask_phone(value: str) -> str:
"""手机号脱敏"""
if not value or not isinstance(value, str):
return value
return re.sub(r'(\d{3})\d{4}(\d{4})', r'\1****\2', value)
@staticmethod
def mask_email(value: str) -> str:
"""邮箱脱敏"""
if not value or not isinstance(value, str):
return value
parts = value.split('@')
if len(parts) == 2:
return f"{parts[0][0]}***@{parts[1]}"
return value
@staticmethod
def mask_id_card(value: str) -> str:
"""身份证脱敏"""
if not value or not isinstance(value, str):
return value
length = len(value)
if length == 18:
return f"{value[:6]}********{value[-4:]}"
elif length == 15:
return f"{value[:6]}*****{value[-4:]}"
return value
@staticmethod
def hash_value(value: Any, salt: str = "") -> str:
"""哈希脱敏"""
if value is None:
return None
text = f"{value}{salt}".encode('utf-8')
return hashlib.sha256(text).hexdigest()
@classmethod
def get_mask_func(cls, mask_type: str) -> Callable:
"""获取脱敏函数"""
mapping = {
'phone': cls.mask_phone,
'email': cls.mask_email,
'id_card': cls.mask_id_card,
'hash': cls.hash_value
}
return mapping.get(mask_type, lambda x: x)
class DynamicMasking:
def __init__(self, policies):
self.policies = policies # {'column_name': 'mask_type'}
self.masker = DataMasker()
def apply_masking(self, df):
"""应用脱敏策略到DataFrame"""
masked_df = df.copy()
for col, mask_type in self.policies.items():
if col in masked_df.columns:
mask_func = self.masker.get_mask_func(mask_type)
masked_df[col] = masked_df[col].apply(mask_func)
return masked_df
def mask_record(self, record: dict) -> dict:
"""脱敏单条记录"""
return {
key: self.masker.get_mask_func(self.policies.get(key, ''))(value)
for key, value in record.items()
}
3.2 基于属性的访问控制(ABAC实现)
from typing import Dict, Any
from functools import lru_cache
class ABACPolicy:
def __init__(self, effect: str, conditions: Dict[str, Any]):
self.effect = effect # 'allow' or 'deny'
self.conditions = conditions
def evaluate(self, attributes: Dict[str, Any]) -> bool:
"""评估策略是否匹配"""
for key, expected in self.conditions.items():
actual = attributes.get(key)
if actual != expected:
return False
return True
class ABACEngine:
def __init__(self):
self.policies = []
def add_policy(self, policy: ABACPolicy):
"""添加策略"""
self.policies.append(policy)
def authorize(self, attributes: Dict[str, Any]) -> bool:
"""授权决策"""
for policy in self.policies:
if policy.evaluate(attributes):
return policy.effect == 'allow'
return False # 默认拒绝
@lru_cache(maxsize=1024)
def check_access(self, user: str, resource: str, action: str) -> bool:
"""带缓存的访问检查"""
# 实际实现中会从外部系统获取属性
user_attrs = self._get_user_attributes(user)
resource_attrs = self._get_resource_attributes(resource)
action_attrs = {'action': action}
attributes = {**user_attrs, **resource_attrs, **action_attrs}
return self.authorize(attributes)
def _get_user_attributes(self, user: str) -> Dict[str, Any]:
"""获取用户属性(模拟实现)"""
# 实际项目中可能从LDAP或IAM系统获取
return {
'user.id': user,
'user.department': 'finance',
'user.role': 'analyst'
}
def _get_resource_attributes(self, resource: str) -> Dict[str, Any]:
"""获取资源属性(模拟实现)"""
return {
'resource.id': resource,
'resource.sensitivity': 'high',
'resource.owner': 'finance'
}
四、数据治理监控与告警
4.1 Prometheus监控集成
from prometheus_client import start_http_server, Gauge, Counter
import time
class DataGovernanceMetrics:
def __init__(self):
self.data_quality_score = Gauge(
'data_quality_score',
'Overall data quality score',
['domain']
)
self.sensitive_data_access = Counter(
'sensitive_data_access_count',
'Count of sensitive data access',
['user', 'resource']
)
self.rule_violations = Counter(
'data_rule_violations',
'Count of data rule violations',
['rule_type', 'severity']
)
def update_quality_score(self, domain: str, score: float):
self.data_quality_score.labels(domain=domain).set(score)
def record_access(self, user: str, resource: str):
self.sensitive_data_access.labels(
user=user, resource=resource
).inc()
def record_violation(self, rule_type: str, severity: str):
self.rule_violations.labels(
rule_type=rule_type, severity=severity
).inc()
# 使用示例
if __name__ == '__main__':
metrics = DataGovernanceMetrics()
start_http_server(8000)
# 模拟数据更新
while True:
metrics.update_quality_score('customer', 0.95)
metrics.record_access('user1', 'customers.pii')
metrics.record_violation('completeness', 'warning')
time.sleep(10)
五、总结与最佳实践
- 代码级治理:将治理规则直接嵌入数据处理流程
- 自动化测试:在CI/CD管道中加入数据质量检查
- 分层实施:从关键数据域开始逐步扩展
- 监控驱动:建立实时数据治理监控体系
- 安全优先:默认实施最小权限原则
以上代码示例展示了如何在技术层面实现数据治理的关键功能。实际项目中,建议将这些组件与企业现有数据平台集成,形成完整的数据治理技术体系。