目录
  1. 1. 第一部分:大规模网络爬虫系统
    1. 1.1. 一、爬虫系统架构总览
    2. 1.2. 二、URL 调度器(Frontier)设计
      1. 1.2.1. 2.1 基本架构
      2. 1.2.2. 2.2 优先级规则
    3. 1.3. 三、礼貌性策略(Politeness)
      1. 1.3.1. 3.1 robots.txt 解析
      2. 1.3.2. 3.2 sitemap.xml 处理
    4. 1.4. 四、去重系统
      1. 1.4.1. 4.1 Bloom Filter:URL 精确去重
      2. 1.4.2. 4.2 SimHash:近重复内容检测
    5. 1.5. 五、分布式爬虫架构
      1. 1.5.1. 5.1 URL 按主机分区
      2. 1.5.2. 5.2 分布式爬虫的组件交互
  2. 2. 第二部分:搜索建议系统
    1. 2.1. 六、Trie 树:前缀匹配引擎
      1. 2.1.1. 6.1 基础 Trie 实现
      2. 2.1.2. 6.2 编辑距离(Damerau-Levenshtein)
    2. 2.2. 七、实时更新与热词系统
      1. 2.2.1. 7.1 实时更新管道架构
      2. 2.2.2. 7.2 Flink 实时聚合示例
    3. 2.3. 八、系统整体架构与性能优化
      1. 2.3.1. 8.1 搜索建议系统完整架构
      2. 2.3.2. 8.2 双 Buffer Trie 更新
系统设计之爬虫系统与搜索建议系统

本文从系统设计的视角,深入剖析两大关键系统:大规模网络爬虫系统和实时搜索建议系统。前者涉及 URL 调度策略、去重算法、礼貌性控制、分布式架构等核心组件;后者涵盖 Trie 索引、编辑距离纠错、热词聚合、实时更新管道等技术要点。两套系统在现代搜索引擎、电商平台和内容聚合产品中均有广泛应用。

第一部分:大规模网络爬虫系统

一、爬虫系统架构总览

一个成熟的大规模爬虫系统由以下几个核心模块组成:

Seed URLs → URL Frontier(调度器) → Fetcher(下载器) → Parser(解析器)
↑ ↓ ↓
←─── URL Filter(去重) ←── Extracted URLs ←── Link Extractor

Storage(存储器)

各模块职责:

  • Seed URLs(种子 URL):爬取的起始点,通常为高质量站点的首页或 sitemap
  • URL Frontier(URL 调度器):管理和调度待爬取的 URL 队列,决定爬取顺序和频率
  • Fetcher(下载器):通过 HTTP 请求下载网页内容,处理重定向、超时、限速等
  • Parser(解析器):从 HTML 中提取结构化信息(文本、链接、元数据)
  • Dedup(去重模块):检测并过滤已爬取的或重复的 URL,避免浪费资源
  • Storage(存储器):持久化存储原始网页和解析后的结构化数据

二、URL 调度器(Frontier)设计

URL Frontier 是爬虫系统的核心调度组件,负责决定”下一个爬取哪个 URL”。它兼具优先级管理和礼貌性控制。

2.1 基本架构

Frontier = Priority Queues + Per-domain Queues + Politeness Controller

[Priority 1 Queue] ──┐
[Priority 2 Queue] ──┤→ Per-domain FIFO ──→ Rate Limiter ──→ Fetcher
[Priority 3 Queue] ──┘ [domain1]
[domain2]
[domain3] ──→ Rate Limiter ──→ Fetcher
...

多级优先队列设计:

import queue
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Optional, Set

@dataclass
class CrawlURL:
url: str
priority: int # 0=最高, 1, 2, 3=最低
depth: int # 当前爬取深度
add_time: float # 加入时间
retry_count: int = 0
max_retries: int = 3

class URLFrontier:
"""URL 调度器实现"""
def __init__(self, num_priorities: int = 4):
# 优先级队列(每级使用独立队列,先进先出)
self.priority_queues: Dict[int, queue.Queue] = {
p: queue.Queue() for p in range(num_priorities)
}

# 每域名的待爬队列
self.domain_queues: Dict[str, List[CrawlURL]] = defaultdict(list)

# 礼貌性控制
self.domain_last_access: Dict[str, float] = {}
self.domain_crawl_delay: Dict[str, float] = {}

# URL 去重集合
self.seen_urls: Set[str] = set()

def add_url(self, url: CrawlURL):
"""添加 URL 到调度器"""
if url.url in self.seen_urls:
return # 已存在,跳过

self.seen_urls.add(url.url)
domain = self._extract_domain(url.url)
self.domain_queues[domain].append(url)

def get_next_url(self) -> Optional[CrawlURL]:
"""获取下一个可爬取的 URL(考虑优先级和礼貌性)"""
for priority in sorted(self.priority_queues.keys()):
# 遍历所有域名,寻找可爬取的 URL
eligible_domains = []
for domain, urls in self.domain_queues.items():
if not urls:
continue

# 检查礼貌性延迟
now = time.time()
delay = self.domain_crawl_delay.get(domain, 1.0)
last_access = self.domain_last_access.get(domain, 0)

if now - last_access >= delay:
eligible_domains.append(domain)

if not eligible_domains:
continue # 当前优先级无可用域名

# 选择一个域名(轮询策略)
domain = eligible_domains[0]
next_url = self.domain_queues[domain].pop(0)

# 更新最后访问时间
self.domain_last_access[domain] = time.time()

return next_url

return None # 无可爬取 URL

def _extract_domain(self, url: str) -> str:
"""从 URL 提取域名"""
from urllib.parse import urlparse
return urlparse(url).netloc

def set_crawl_delay(self, domain: str, delay: float):
"""设置域名的爬取延迟(从 robots.txt 或配置)"""
self.domain_crawl_delay[domain] = delay

2.2 优先级规则

class PriorityAssigner:
"""URL 优先级分配器"""
def assign_priority(self, url: str, context: Dict) -> int:
score = 0

# 规则 1: 浅层 URL 优先
depth = context.get('depth', 0)
if depth <= 2:
score += 3
elif depth <= 5:
score += 2
else:
score -= 1

# 规则 2: 高 PageRank 域名的 URL 优先
domain = self._extract_domain(url)
pagerank = context.get('domain_pagerank', {}).get(domain, 0)
if pagerank > 7:
score += 5

# 规则 3: 时效性内容优先(如新闻站点)
if self._is_news_domain(domain):
score += 3

# 规则 4: 曾经返回高价值内容的域名优先
historical_score = context.get('domain_value', {}).get(domain, 0)
score += int(historical_score)

# 映射到优先级层级
if score >= 8:
return 0 # 最高优先级
elif score >= 5:
return 1
elif score >= 2:
return 2
return 3 # 最低优先级

三、礼貌性策略(Politeness)

爬虫系统的”礼貌”体现在尊重目标网站的爬取规则和资源限制,避免对目标服务器造成负担。

3.1 robots.txt 解析

import re
from urllib.parse import urlparse

class RobotsTxtParser:
"""robots.txt 解析器"""

def __init__(self):
self.rules: Dict[str, List[Dict]] = {} # {user_agent: [rule_dicts]}

def parse(self, content: str):
"""解析 robots.txt 内容"""
current_agent = '*'
for line in content.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue

key, value = line.split(':', 1)
key = key.strip().lower()
value = value.strip()

if key == 'user-agent':
current_agent = value
self.rules.setdefault(current_agent, [])
elif key == 'disallow':
self.rules[current_agent].append({
'type': 'disallow',
'path': value,
'pattern': self._pattern_from_path(value)
})
elif key == 'allow':
self.rules[current_agent].append({
'type': 'allow',
'path': value,
'pattern': self._pattern_from_path(value)
})
elif key == 'crawl-delay':
self.rules[current_agent].append({
'type': 'crawl-delay',
'value': float(value)
})

def is_allowed(self, url: str, user_agent: str = '*') -> bool:
"""检查 URL 是否允许爬取"""
path = urlparse(url).path

# 按优先级获取规则: 指定 UA > 通配 UA
agent_rules = self.rules.get(user_agent, []) or self.rules.get('*', [])

allowed = True
for rule in agent_rules:
if rule['type'] == 'disallow' and rule['pattern'].search(path):
allowed = False
elif rule['type'] == 'allow' and rule['pattern'].search(path):
allowed = True

return allowed

def get_crawl_delay(self, user_agent: str = '*') -> float:
"""获取爬取延迟"""
agent_rules = self.rules.get(user_agent, []) or self.rules.get('*', [])
for rule in agent_rules:
if rule['type'] == 'crawl-delay':
return rule['value']
return 1.0 # 默认 1 秒

def _pattern_from_path(self, path: str) -> re.Pattern:
"""将 robots.txt 路径模式转为正则表达式"""
escaped = re.escape(path)
escaped = escaped.replace(r'\*', '.*')
# $ 匹配结尾
if escaped.endswith(r'\$'):
escaped = escaped[:-2] + '$'
return re.compile(f'^{escaped}')

3.2 sitemap.xml 处理

class SitemapParser:
"""Sitemap 解析器 — 发现高质量 URL 的捷径"""

def parse_sitemap(self, content: str) -> List[str]:
"""解析 sitemap.xml,返回 URL 列表"""
import xml.etree.ElementTree as ET

urls = []
root = ET.fromstring(content)
ns = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}

for url_elem in root.findall('.//ns:url', ns):
loc = url_elem.find('ns:loc', ns)
if loc is not None:
urls.append(loc.text)

# 解析优先级
priority = url_elem.find('ns:priority', ns)
changefreq = url_elem.find('ns:changefreq', ns)
lastmod = url_elem.find('ns:lastmod', ns)

# 可用于给 URL 设定优先级

return urls

四、去重系统

去重是爬虫系统中最关键的效率保障。分为 URL 精确去重和内容近似去重两个层面。

4.1 Bloom Filter:URL 精确去重

import mmh3
from bitarray import bitarray
import math

class BloomFilter:
"""布隆过滤器:空间高效的集合成员检测"""

def __init__(self, capacity: int, error_rate: float = 0.001):
"""
capacity: 预期元素数
error_rate: 误判率
"""
# 计算最优位数组大小和哈希函数数
self.size = int(-capacity * math.log(error_rate) / (math.log(2) ** 2))
self.hash_count = int(self.size / capacity * math.log(2))

self.bit_array = bitarray(self.size)
self.bit_array.setall(0)

def add(self, item: str):
"""添加元素到过滤器"""
for seed in range(self.hash_count):
index = mmh3.hash(item, seed) % self.size
self.bit_array[index] = 1

def contains(self, item: str) -> bool:
"""检查元素是否可能存在"""
for seed in range(self.hash_count):
index = mmh3.hash(item, seed) % self.size
if not self.bit_array[index]:
return False
return True # 可能存在(有小概率误判)

def __contains__(self, item: str) -> bool:
return self.contains(item)

# 使用示例
bf = BloomFilter(capacity=100_000_000, error_rate=0.001)
bf.add('https://example.com/page1')
assert 'https://example.com/page1' in bf # True
assert 'https://example.com/page2' not in bf # True(大概率)

# 空间复杂度分析:
# 1 亿 URL,0.1% 误判率 → 约 179 MB
# 对比 HashSet 存储 1 亿 URL(平均 60 字节)→ 约 6 GB
# Bloom Filter 节省约 97% 的空间

URL 规范化(Canonicalization)——去重前的必要步骤:

from urllib.parse import urlparse, urlunparse, quote

class URLCanonicalizer:
"""URL 标准化器 — 消除"不同但等价"的 URL"""

def canonicalize(self, url: str) -> str:
parsed = urlparse(url.lower()) # 统一小写

# 标准化 scheme 和 netloc
scheme = parsed.scheme or 'http'
netloc = parsed.netloc

# 移除默认端口
if (scheme == 'http' and netloc.endswith(':80')) or \
(scheme == 'https' and netloc.endswith(':443')):
netloc = netloc.rsplit(':', 1)[0]

# 标准化路径
path = parsed.path
# 移除 /../ 和 /./
path = self._normalize_path(path)
# 移除结尾斜杠(目录除外)
if path != '/' and path.endswith('/'):
path = path.rstrip('/')

# 排序查询参数
query = '&'.join(sorted(
parsed.query.split('&') if parsed.query else []
))

# 移除 fragment
fragment = ''

return urlunparse((scheme, netloc, path, parsed.params, query, fragment))

def _normalize_path(self, path: str) -> str:
segments = []
for seg in path.split('/'):
if seg == '.' or seg == '':
continue
if seg == '..':
if segments:
segments.pop()
else:
segments.append(seg)
return '/' + '/'.join(segments)

# 示例:
canonicalizer = URLCanonicalizer()
print(canonicalizer.canonicalize('HTTP://www.Example.com:80/a/../b/./c.html?b=2&a=1#section'))
# http://www.example.com/b/c.html?a=1&b=2

4.2 SimHash:近重复内容检测

import hashlib
from collections import defaultdict

class SimHash:
"""SimHash — 检测近似重复的网页内容"""

def __init__(self, hash_bits: int = 64):
self.hash_bits = hash_bits

def compute(self, text: str) -> int:
"""计算文本的 SimHash 指纹"""
# 分词并计算每个 token 的权重(这里用 TF 作为权重)
tokens = text.lower().split()
token_weights = defaultdict(int)
for token in tokens:
token_weights[token] += 1

# 对每个 token 的 hash 按权重累加
vector = [0] * self.hash_bits
for token, weight in token_weights.items():
token_hash = int(hashlib.md5(token.encode()).hexdigest()[:16], 16)

for i in range(self.hash_bits):
bit = (token_hash >> i) & 1
vector[i] += weight if bit else -weight

# 生成指纹
fingerprint = 0
for i in range(self.hash_bits):
if vector[i] > 0:
fingerprint |= (1 << i)

return fingerprint

def hamming_distance(self, hash1: int, hash2: int) -> int:
"""计算两个 SimHash 的汉明距离"""
xor = hash1 ^ hash2
distance = 0
while xor:
distance += 1
xor &= xor - 1 # 清除最低位的 1
return distance

def is_duplicate(self, hash1: int, hash2: int, threshold: int = 3) -> bool:
"""判断是否近似重复(汉明距离 <= threshold)"""
return self.hamming_distance(hash1, hash2) <= threshold

# 使用示例:
simhash = SimHash()
h1 = simhash.compute("Python is a great programming language for data science")
h2 = simhash.compute("Python is a great programming language for data scientists")
print(simhash.hamming_distance(h1, h2)) # 通常 <= 3,判定为近似重复

五、分布式爬虫架构

当单机爬虫无法满足吞吐量需求时,需要分布式架构。核心挑战是 URL 的分区和工作节点的协调。

5.1 URL 按主机分区

import hashlib
from collections import defaultdict

class ConsistentHashRouter:
"""一致性哈希路由器:将域名映射到爬虫工作节点"""

def __init__(self, num_workers: int, virtual_nodes: int = 150):
self.num_workers = num_workers
self.virtual_nodes = virtual_nodes
self.ring: Dict[int, int] = {} # {hash(domain+vnode): worker_id}
self._build_ring()

def _build_ring(self):
"""构建一致性哈希环"""
for worker_id in range(self.num_workers):
for vnode in range(self.virtual_nodes):
key = f"worker-{worker_id}-vnode-{vnode}"
hash_val = hashlib.md5(key.encode()).digest()
ring_pos = int.from_bytes(hash_val[:8], 'big')
self.ring[ring_pos] = worker_id

def get_worker(self, domain: str) -> int:
"""为域名分配工作节点"""
hash_val = hashlib.md5(domain.encode()).digest()
ring_pos = int.from_bytes(hash_val[:8], 'big')

# 找到环上最近的节点(顺时针方向)
positions = sorted(self.ring.keys())
for pos in positions:
if ring_pos <= pos:
return self.ring[pos]

return self.ring[positions[0]] # 环绕到第一个节点

def add_worker(self, worker_id: int):
"""动态添加工作节点"""
for vnode in range(self.virtual_nodes):
key = f"worker-{worker_id}-vnode-{vnode}"
hash_val = hashlib.md5(key.encode()).digest()
ring_pos = int.from_bytes(hash_val[:8], 'big')
self.ring[ring_pos] = worker_id

def remove_worker(self, worker_id: int):
"""移除工作节点"""
to_remove = [pos for pos, wid in self.ring.items() if wid == worker_id]
for pos in to_remove:
del self.ring[pos]

# 一致性哈希的优势:
# 1. 添加/移除节点时,只需迁移少部分数据
# 2. 同一域名始终由同一节点处理(维护连接复用和速率限制)
# 3. 虚拟节点使负载更均匀

5.2 分布式爬虫的组件交互

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Crawler │ │ Crawler │ │ Crawler │
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ 提取新URL │ │
▼ ▼ ▼
┌────────────────────────────────────────────────────┐
│ Message Queue (Kafka/RabbitMQ) │
│ Topic: extracted_urls | Topic: crawl_commands │
└───────────────────┬────────────────────────────────┘

┌────────────▼────────────┐
│ URL Dedup & Router │
│ (Bloom Filter + CHash) │
└────────────┬────────────┘

┌────────────▼────────────┐
│ URL Frontier (Redis) │
│ per-domain sorted sets │
└─────────────────────────┘

消息队列设计(Kafka):

Topic: extracted_urls
- Producer: Crawler Workers(解析出的新 URL)
- Consumer: URL Dedup & Router(去重+路由后写入 Frontier)

Topic: crawl_commands
- Producer: URL Frontier Scheduler(调度器中已到期的 URL)
- Consumer: Crawler Workers(执行爬取)

分区策略: 按域名哈希分区,保证同一域名的 URL 有序

第二部分:搜索建议系统

六、Trie 树:前缀匹配引擎

搜索建议的核心数据结构是 Trie 树(前缀树),支持 O(k) 时间的前缀查找,其中 k 为前缀长度。

6.1 基础 Trie 实现

class TrieNode:
def __init__(self):
self.children: Dict[str, TrieNode] = {}
self.is_end: bool = False
self.word: Optional[str] = None
self.frequency: int = 0 # 搜索频率
self.top_results: List[str] = [] # 该节点下的 Top-K 高频词


class SearchTrie:
"""搜索建议 Trie 树"""

def __init__(self, top_k: int = 10):
self.root = TrieNode()
self.top_k = top_k

def insert(self, word: str, frequency: int = 1):
"""插入词条并更新频率"""
node = self.root
path = []

for char in word.lower():
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
path.append(node)

node.is_end = True
node.word = word
node.frequency += frequency

# 回溯更新路径上每个节点的 Top-K 结果
for n in path:
self._update_top_k(n, word, node.frequency)

def search(self, prefix: str, limit: int = None) -> List[str]:
"""根据前缀搜索 Top-K 建议"""
limit = limit or self.top_k
node = self._find_node(prefix.lower())
if node is None:
return []
return node.top_results[:limit]

def _find_node(self, prefix: str) -> Optional[TrieNode]:
"""查找前缀对应的节点"""
node = self.root
for char in prefix:
if char not in node.children:
return None
node = node.children[char]
return node

def _update_top_k(self, node: TrieNode, word: str, frequency: int):
"""更新节点的 Top-K 高频建议"""
# 检查是否已在列表中
existing = [(i, w, f) for i, (w, f) in enumerate(node.top_results) if w == word]

if existing:
idx, _, _ = existing[0]
node.top_results[idx] = (word, frequency)
else:
node.top_results.append((word, frequency))

# 按频率降序排序,只保留 Top-K
node.top_results = sorted(
node.top_results, key=lambda x: x[1], reverse=True
)[:self.top_k]

def fuzzy_search(self, prefix: str, max_edit_distance: int = 2) -> List[str]:
"""模糊搜索:容忍输入错误的前缀匹配"""
candidates = []
self._dfs_fuzzy(self.root, '', prefix, 0, 0, max_edit_distance, candidates)
candidates.sort(key=lambda x: x[1], reverse=True)
return [word for word, _ in candidates[:self.top_k]]

def _dfs_fuzzy(self, node: TrieNode, current_word: str, target: str,
pos: int, edits: int, max_edits: int, candidates: List):
"""DFS 搜索容错匹配"""
if edits > max_edits or pos > len(target):
return

# 匹配成功:前缀匹配完成
if pos >= len(target):
if node.is_end:
candidates.append((node.word, node.frequency - edits * 1000))
# 收集子树中所有单词
self._collect_subtree(node, candidates)
return

char = target[pos]

# 1. 精确匹配
if char in node.children:
self._dfs_fuzzy(
node.children[char], current_word + char,
target, pos + 1, edits, max_edits, candidates
)

# 2. 替换(substitution)
for child_char, child_node in node.children.items():
if child_char != char:
self._dfs_fuzzy(
child_node, current_word + child_char,
target, pos + 1, edits + 1, max_edits, candidates
)

# 3. 删除(deletion)— 跳过当前目标字符
self._dfs_fuzzy(
node, current_word, target, pos + 1, edits + 1, max_edits, candidates
)

# 4. 插入(insertion)— 跳过当前 Trie 字符
for child_char, child_node in node.children.items():
self._dfs_fuzzy(
child_node, current_word + child_char,
target, pos, edits + 1, max_edits, candidates
)

def _collect_subtree(self, node: TrieNode, candidates: List):
"""收集子树中所有完整单词"""
if node.is_end:
candidates.append((node.word, node.frequency))
for child in node.children.values():
self._collect_subtree(child, candidates)

6.2 编辑距离(Damerau-Levenshtein)

Damerau-Levenshtein 距离在标准编辑距离(插入/删除/替换)基础上增加了相邻字符交换操作,更适合处理常见打字错误。

class DamerauLevenshtein:
"""Damerau-Levenshtein 编辑距离"""

def distance(self, s1: str, s2: str) -> int:
len1, len2 = len(s1), len(s2)

# dp[i][j] = s1[:i] 与 s2[:j] 的编辑距离
dp = [[0] * (len2 + 1) for _ in range(len1 + 1)]

for i in range(len1 + 1):
dp[i][0] = i
for j in range(len2 + 1):
dp[0][j] = j

for i in range(1, len1 + 1):
for j in range(1, len2 + 1):
cost = 0 if s1[i-1] == s2[j-1] else 1

dp[i][j] = min(
dp[i-1][j] + 1, # 删除
dp[i][j-1] + 1, # 插入
dp[i-1][j-1] + cost # 替换
)

# 相邻字符交换(Damerau 扩展)
if i > 1 and j > 1 and s1[i-1] == s2[j-2] and s1[i-2] == s2[j-1]:
dp[i][j] = min(dp[i][j], dp[i-2][j-2] + cost)

return dp[len1][len2]

# 示例:
dl = DamerauLevenshtein()
print(dl.distance('teh', 'the')) # 1 (相邻交换,比 standard Levenshtein 的 2 更好)
print(dl.distance('kitten', 'sitting')) # 3

七、实时更新与热词系统

7.1 实时更新管道架构

┌───────────┐     ┌───────────┐     ┌──────────────┐
│ 搜索日志 │────▶│ Kafka │────▶│ Flink/Spark │
│ (前端收集) │ │ (缓冲) │ │ (实时聚合) │
└───────────┘ └───────────┘ └──────┬───────┘

┌────────────────┘

┌──────────┐ ┌──────────────┐
│ Redis │────▶│ Trie Builder │
│ (热词榜) │ │ (定期重建) │
└──────────┘ └──────────────┘

Redis Sorted Set 存储热搜:

import redis
import time

class HotQueriesManager:
"""基于 Redis Sorted Set 的热搜词管理"""

def __init__(self, redis_client: redis.Redis, ttl: int = 86400):
self.redis = redis_client
self.ttl = ttl # 滑动窗口大小(秒)
self.key_prefix = 'hot_queries'

def record_search(self, query: str):
"""记录一次搜索"""
now = time.time()
window_key = f'{self.key_prefix}:{int(now // self.ttl)}'

# ZINCRBY 原子递增分数
self.redis.zincrby(window_key, 1, query)
# 设置过期时间
self.redis.expire(window_key, self.ttl * 2)

def get_top_queries(self, top_n: int = 100) -> List[tuple]:
"""获取当前窗口的热门搜索"""
# 聚合最近几个窗口
now = time.time()
current_window = int(now // self.ttl)
keys = [
f'{self.key_prefix}:{current_window}',
f'{self.key_prefix}:{current_window - 1}',
]

# 使用 ZUNIONSTORE 合并窗口
temp_key = f'{self.key_prefix}:temp:{now}'
self.redis.zunionstore(temp_key, keys, aggregate='SUM')

# 获取 Top-N
results = self.redis.zrevrange(temp_key, 0, top_n - 1, withscores=True)

# 清理临时键
self.redis.delete(temp_key)

return [(q.decode(), score) for q, score in results]

def get_trending_queries(self, lookback_minutes: int = 60) -> List[tuple]:
"""获取趋势上升的搜索词(突发检测)"""
now = time.time()
current = int(now // 3600)
previous = current - lookback_minutes // 60

current_key = f'{self.key_prefix}:{current}'
previous_key = f'{self.key_prefix}:{previous}'

# 获取两个窗口的数据
current_data = self.redis.zrevrange(
current_key, 0, -1, withscores=True
)
previous_data = dict(
self.redis.zrevrange(previous_key, 0, -1, withscores=True)
)

# 计算增长率
trending = []
for query_bytes, count in current_data:
query = query_bytes.decode()
prev_count = previous_data.get(query_bytes, 0)
if prev_count > 0:
growth = (count - prev_count) / prev_count
if growth > 2.0: # 增长超过 200%
trending.append((query, growth, count))

trending.sort(key=lambda x: x[1], reverse=True)
return trending
// Flink 实时搜索日志聚合(Java 伪代码)
DataStream<SearchLog> searchStream = env
.addSource(new FlinkKafkaConsumer<>("search_logs", ...));

DataStream<QueryCount> aggregated = searchStream
.keyBy(SearchLog::getQuery)
.window(SlidingProcessingTimeWindows.of(
Time.minutes(10), Time.minutes(1)))
.aggregate(new CountAggregator());

// 写入 Redis
aggregated.addSink(new RedisSink<>(config, new QueryCountMapper()));

// 突发检测
DataStream<Alert> alerts = aggregated
.keyBy(QueryCount::getQuery)
.process(new TrendDetectionProcessFunction())
.filter(alert -> alert.growthRate > 2.0);

八、系统整体架构与性能优化

8.1 搜索建议系统完整架构

            ┌──────────────────┐
│ Load Balancer │
└────────┬─────────┘

┌──────────────┼──────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ API Svr │ │ API Svr │ │ API Svr │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└──────────────┼──────────────┘

┌─────────────▼──────────────┐
│ Local Cache │
│ (Caffeine/Guava Cache) │
│ TTL: 5 minutes │
└─────────────┬──────────────┘
│ (cache miss)
┌─────────────▼──────────────┐
│ Redis Cluster │
│ - hot queries (sorted set) │
│ - pre-computed top K │
└─────────────┬──────────────┘

┌─────────────▼──────────────┐
│ Trie Service (gRPC) │
│ - in-memory compressed trie│
│ - periodic rebuild from DB │
└─────────────────────────────┘

优化技巧:

  1. 分层缓存:L1 本地缓存(Caffeine, 5min TTL)→ L2 Redis → L3 Trie Service
  2. Trie 压缩:使用 Radix Tree(压缩前缀树)减少内存占用
  3. 分区 Trie:按首字母分区到不同实例,减少单实例内存压力
  4. 预计算:对于高频前缀,预计算 Top-10 结果缓存到 Redis
  5. 异步重建:Trie 树的更新采用双 Buffer 机制,新数据先写入备用树,完成后 swap
  6. 降级策略:Trie Service 不可用时,回退到 Redis 预计算结果的模糊匹配

8.2 双 Buffer Trie 更新

import threading

class DoubleBufferTrieService:
"""双 Buffer Trie 服务:无锁热更新"""

def __init__(self):
self.active_trie = SearchTrie() # 当前服务的 Trie
self.standby_trie = SearchTrie() # 重建中的 Trie
self.lock = threading.RLock()

def query(self, prefix: str) -> List[str]:
"""查询(使用 active trie)"""
return self.active_trie.search(prefix)

def rebuild(self, word_frequencies: Dict[str, int]):
"""重建备用 Trie,完成后 swap"""
# 在备用 Trie 中加载数据
for word, freq in word_frequencies.items():
self.standby_trie.insert(word, freq)

# 原子 swap
with self.lock:
self.active_trie, self.standby_trie = self.standby_trie, self.active_trie
# 清空旧的 active(现在是 standby)
self.standby_trie = SearchTrie()

def incremental_update(self, word: str, frequency_delta: int):
"""增量更新(直接更新 active)"""
with self.lock:
self.active_trie.insert(word, frequency_delta)

爬虫系统和搜索建议系统,看似独立,实则紧密关联。爬虫负责从互联网获取海量数据并建立索引;搜索建议则在一端为用户提供即时、智能的查询引导,降低搜索门槛。两者的成功实现都依赖于对数据结构的深刻理解(Trie、Bloom Filter、SimHash)和分布式系统的工程实践(Kafka、Redis、Flink)。理解这些核心原理后,可根据具体业务规模灵活裁减和组合上述设计。

文章作者: Leo·Cheung
文章链接: http://tufusi.com/2020/04/05/%E7%B3%BB%E7%BB%9F%E8%AE%BE%E8%AE%A1%E4%B9%8B%E7%88%AC%E8%99%AB%E7%B3%BB%E7%BB%9F%E4%B8%8E%E6%90%9C%E7%B4%A2%E5%BB%BA%E8%AE%AE%E7%B3%BB%E7%BB%9F/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ONE·PIECE
打赏
  • 微信
  • 支付宝

评论