🧠 布隆过滤器详解与Python实现
1. 什么是布隆过滤器?
布隆过滤器(Bloom Filter)是由 Burton Howard Bloom 在 1970 年提出的一种空间效率极高的概率型数据结构。它用于判断一个元素是否在一个集合中。这种判断是概率性的:它可能会误判(False Positive,即告诉你某个元素存在而实际上并不存在),但它绝不会漏判(False Negative,即如果它说某个元素不存在,那就一定不存在)。
它的核心是一个二进制位数组(Bit Array)(初始值全为0)和一系列哈希函数。由于其概率性和空间效率高的特点,它非常适合那些可以接受一定误判率,但对内存和查询速度有高要求的场景。
2. 为什么要使用布隆过滤器及其使用场景
✨ 优点与价值
布隆过滤器在空间效率和查询时间上通常都远超其他算法,其优点包括:
- 极高的空间效率:不需要存储元素本身,只需一个位数组,占用内存极少。
- 极快的查询速度:查询时间复杂度为 O(k),k 是哈希函数个数,通常很小。
- 保密性强:由于不存储原始数据,在某些场景下有利于数据安全。
🎯 常见应用场景
布隆过滤器非常适合以下场景:
- 缓存穿透防护:在查询缓存之前,先用布隆过滤器判断数据是否存在,避免大量请求直接访问数据库(查询根本不存在的key)。
- 网页爬虫 URL 去重:快速判断一个 URL 是否已被爬取,避免重复工作和循环抓取。
- 垃圾邮件过滤:判断发件人地址是否在垃圾邮件列表中。
- 大规模数据集快速查找:如判断用户ID是否属于某个特定群体,尽管可能存在误判,但能快速排除大量不在集合中的元素。
⚠️ 缺点与注意事项
布隆过滤器也存在一些缺点:
- 存在误判率:这是其概率性本质决定的。误判率会随着元素的增加而上升,但可以通过调整参数来控制。
- 不支持删除操作:传统布隆过滤器很难安全地删除元素,因为删除一个元素(将其对应位置0)可能会影响其他映射到相同位的元素。虽然有计数布隆过滤器(Counting Bloom Filter)(用计数器代替位)的变体支持删除,但会增加空间开销。
3. 布隆过滤器原理详解
布隆过滤器的核心是一个很长的二进制位数组(Bit Array) 和一系列随机映射函数(哈希函数)。
🔧 工作流程
- 初始化:创建一个长度为
m
的位数组,所有位初始化为 0。同时选择k
个独立且均匀分布的哈希函数。 - 添加元素:当要添加一个元素时,用
k
个哈希函数分别计算该元素的哈希值,得到k
个位置(索引)。将位数组中这些位置的值都置为 1。 - 查询元素:当要查询一个元素是否存在时,同样用这
k
个哈希函数计算哈希值,得到k
个位置: 如果这些位置有任何一位为 0,则该元素肯定不存在于集合中。 如果这些位置全部为 1,则该元素可能存在于集合中(由于哈希冲突,存在误判的可能)。
📊 参数选择与误判率
布隆过滤器的误判率主要受三个因素影响:
- 位数组大小 (
m
):m
越大,误判率越低,但消耗的内存也越大。 - 哈希函数数量 (
k
):k
过多或过少都会导致误判率升高。 - 已插入元素数量 (
n
):插入的元素越多,位数组中1的比例越高,误判率也随之升高。
可以根据预期元素数量 n
和期望的误判率 p
来估算最优的 m
和 k
:

4. Python实现本地缓存版布隆过滤器(使用mmh3和bitarray)
使用 mmh3
和 bitarray
库来实现功能相对完善的布隆过滤器。
🧰 环境要求
-
Python 3.6+
-
安装
mmh3
和bitarray
库:pip install mmh3 bitarray
🖥️ 代码实现
# bloom_local_byself.py
import math
import mmh3
from bitarray import bitarray
from typing import Any, List
import array
class LocalBloomFilter:
"""
使用 mmh3 和 bitarray 实现的布隆过滤器。
"""
def __init__(self, items_count: int, false_positive_prob: float = 0.01):
"""
初始化布隆过滤器。
Args:
items_count (int): 预期要插入的元素数量。
false_positive_prob (float): 可接受的误判率(0到1之间)。默认 0.01 (1%)。
"""
if not 0 < false_positive_prob < 1:
raise ValueError("false_positive_prob 必须在0和1之间")
if items_count <= 0:
raise ValueError("items_count 必须大于0")
self.n = items_count
self.p = false_positive_prob
# 计算最优的位数组大小和哈希函数数量
self.m = self._calculate_m(self.n, self.p)
self.k = self._calculate_k(self.m, self.n)
# 初始化位数组
self.bit_array = bitarray(self.m)
self.bit_array.setall(0)
self._elements_added = 0
def _calculate_m(self, n: int, p: float) -> int:
"""计算位数组大小 m"""
m = - (n * math.log(p)) / (math.log(2) ** 2)
return int(m)
def _calculate_k(self, m: int, n: int) -> int:
"""计算哈希函数数量 k"""
k = (m / n) * math.log(2)
return max(1, int(round(k))) # 至少为1
def _get_hash_indexes(self, item: Any) -> List[int]:
"""
计算一个元素对应的k个位索引。
使用 mmh3 哈希函数,通过不同的种子模拟多个哈希函数。
"""
indexes = []
for i in range(self.k):
# 使用不同的种子(i)来模拟不同的哈希函数
hash_val = mmh3.hash(str(item), i) % self.m
indexes.append(hash_val)
return indexes
def add(self, item: Any) -> None:
"""
将一个元素添加到布隆过滤器中。
"""
indexes = self._get_hash_indexes(item)
for index in indexes:
self.bit_array[index] = 1
self._elements_added += 1
def contains(self, item: Any) -> bool:
"""
检查一个元素是否可能在布隆过滤器中。
返回:
True: 元素可能存在(存在误判可能)。
False: 元素一定不存在。
"""
indexes = self._get_hash_indexes(item)
for index in indexes:
if self.bit_array[index] == 0:
return False
return True
def __contains__(self, item: Any) -> bool:
"""支持 'item in filter' 语法"""
return self.contains(item)
@property
def size(self) -> int:
"""返回位数组大小"""
return self.m
@property
def hash_count(self) -> int:
"""返回哈希函数数量"""
return self.k
@property
def elements_added(self) -> int:
"""返回已添加的元素数量"""
return self._elements_added
@property
def current_false_positive_prob(self) -> float:
"""计算当前的误判率(理论估算值)"""
# 公式: (1 - e^(-k * n / m)) ^ k
n = self._elements_added
m = self.m
k = self.k
return pow(1 - math.exp(-k * n / m), k)
def __str__(self) -> str:
"""返回布隆过滤器的一些基本信息"""
return (f"BloomFilter(位数组大小={self.m}, "
f"哈希函数数={self.k}, "
f"已添加元素≈{self._elements_added}, "
f"当前理论误判率≈{self.current_false_positive_prob:.6f})")
# 示例用法和测试
if __name__ == "__main__":
# 创建一个布隆过滤器,预期插入1000个元素,目标误判率1%
bloom_filter = LocalBloomFilter(1000, 0.01)
print(bloom_filter)
print("-" * 50)
# 添加一些元素
items_to_add = ["apple", "banana", "cherry", "date", "elderberry"]
for item in items_to_add:
bloom_filter.add(item)
print(f"添加了: '{item}'")
print("-" * 50)
print(f"添加后: {bloom_filter}")
# 检查元素是否存在
test_items = ["apple", "banana", "fig", "grape"]
for item in test_items:
result = bloom_filter.contains(item)
status = "可能存在" if result else "肯定不存在"
print(f"检查 '{item}': {status}")
# 使用 `in` 语法
print("\n使用 'in' 语法检查:")
print(f"'cherry' in filter? {'cherry' in bloom_filter}")
print(f"'watermelon' in filter? {'watermelon' in bloom_filter}")
说明:
- 这个实现使用
mmh3
作为哈希函数,bitarray
来处理位操作,性能比纯Python列表实现要好很多。 - 哈希函数通过向
mmh3.hash
传递不同的种子(i
)来模拟多个哈希函数。 - 类提供了计算最优
m
和k
的方法,并可以估算当前的误判率。
5. Python借助第三方库pybloom_live实现布隆过滤器
使用 pybloom_live
可以轻松实现强大且高效的布隆过滤器。
🧰 环境要求
-
Python 3.6+
-
安装
pybloom_live
库:pip install pybloom_live
🖥️ 代码实现
pybloom_live
提供了两种布隆过滤器:BloomFilter
(固定大小)和 ScalableBloomFilter
(可扩展,能自动应对超出预期数量的元素,但误判率可能会上升)。
# bloom_local_bythird.py
from pybloom_live import BloomFilter, ScalableBloomFilter
# 1. 使用固定大小的 BloomFilter
# 参数:容量 capacity, 误判率 error_rate
bf = BloomFilter(capacity=10000, error_rate=0.001)
print(f"固定布隆过滤器:容量={bf.capacity}, 误判率={bf.error_rate}")
# 添加元素
bf.add("www.example.com")
bf.add("www.google.com")
# 检查元素
print(f"Contains 'www.example.com': {'www.example.com' in bf}") # True
print(f"Contains 'www.github.com': {'www.github.com' in bf}") # False (很可能)
# 当前元素数量 (近似值)
print(f"Approximate number of items: {len(bf)}")
# 2. 使用可扩展的 ScalableBloomFilter
# 参数:初始模式 mode (SMALL_SET_GROWTH or LARGE_SET_GROWTH), 初始误判率 error_rate
sbf = ScalableBloomFilter(initial_capacity=100, mode=ScalableBloomFilter.SMALL_SET_GROWTH, error_rate=0.001)
# 添加大量元素,超出初始容量也没关系
for i in range(1000):
sbf.add(f"item_{i}")
# 检查元素
print(f"Contains 'item_999': {'item_999' in sbf}") # True
print(f"Contains 'item_2000': {'item_2000' in sbf}") # False
print(f"Approximate number of items in scalable filter: {len(sbf)}")
说明:
pybloom_live
库经过优化,性能很好。BloomFilter
在元素数量超过capacity
后,误判率会急剧上升。ScalableBloomFilter
通过使用多个过滤器层来自动扩展,更适合不确定最终元素数量的场景。
要注意,
pybloom_live
中的过滤器对象未实现contains
方法,但支持in
语法进行判断
6. Python实现基于Redis (5.0+) 的布隆过滤器
Redis 5.0及以上版本可以通过加载 RedisBloom 模块来获得原生的布隆过滤器支持,这是官方推荐的方案,性能和功能都最好。
🧰 环境要求
-
Redis Server 5.0+
-
RedisBloom 模块已安装并加载。使用Docker部署(直接部署带有过滤器模块的版本):
docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest
-
Python 3.6+
-
Redis Python客户端 和 redisbloom 库:
pip install redis redisbloom
🖥️ 代码实现
# bloom_redis_with_bloom.py
from redisbloom.client import Client
class RedisBloomFilter:
"""
使用 RedisBloom 模块的布隆过滤器封装。
适用于 Redis 5.0+ 并已加载 RedisBloom 模块。
"""
def __init__(self, redis_client: Client, filter_name: str, capacity: int, error_rate: float = 0.01):
"""
初始化Redis布隆过滤器。
Args:
redis_client (Client): 已连接的RedisBloom客户端实例。
filter_name (str): 布隆过滤器在Redis中的键名。
capacity (int): 预期容量。
error_rate (float): 可接受的误判率。
"""
self.client = redis_client
self.filter_name = filter_name
self.capacity = capacity
self.error_rate = error_rate
# 显式创建过滤器。如果已存在,则忽略。
# BF.RESERVE key error_rate capacity
try:
self.client.bfCreate(self.filter_name, self.error_rate, self.capacity)
except Exception as e:
# 常见的错误是过滤器已存在,这通常可以忽略
if 'exists' not in str(e).lower():
print(f"过滤器可能已存在或创建时出错(可忽略): {e}")
def add(self, item) -> bool:
"""
添加一个元素到布隆过滤器。
Returns:
bool: 如果元素是新添加的(可能之前不存在),返回True;如果可能已经存在,返回False。
注意:由于布隆过滤器的特性,返回False不一定代表之前一定存在。
"""
# BF.ADD key item
return self.client.bfAdd(self.filter_name, item)
def add_multi(self, items: list) -> list:
"""
批量添加元素。
Returns:
list: 每个元素的添加结果列表 (1表示新添加, 0表示可能已存在)。
"""
# BF.MADD key item1 item2 ...
return self.client.bfMAdd(self.filter_name, *items)
def contains(self, item) -> bool:
"""
检查元素是否可能在布隆过滤器中。
Returns:
bool: True表示可能存在,False表示肯定不存在。
"""
# BF.EXISTS key item
return self.client.bfExists(self.filter_name, item)
def contains_multi(self, items: list) -> list:
"""
批量检查元素。
Returns:
list: 每个元素的存在结果列表 (1表示可能存在, 0表示肯定不存在)。
"""
# BF.MEXISTS key item1 item2 ...
return self.client.bfMExists(self.filter_name, *items)
def info(self) -> dict:
"""
获取布隆过滤器的信息。
Returns:
dict: 包含容量、大小、哈希函数数量等信息的字典。
"""
# BF.INFO key
return self.client.bfInfo(self.filter_name)
# 示例用法
if __name__ == "__main__":
import redis
# 方式1:使用 redisbloom 的 Client
rb_client = Client(host='localhost', port=6379, db=0)
# 方式2:也可以使用标准的redis客户端,但命令需要稍作调整
# standard_client = redis.Redis(host='localhost', port=6379, db=0)
# rb_client = Client.from_redis(standard_client)
# 创建过滤器实例
filter_name = "my_redis_bloom_filter"
capacity = 10000
error_rate = 0.001
redis_bloom = RedisBloomFilter(rb_client, filter_name, capacity, error_rate)
# 添加元素
print(f"Add 'apple': {redis_bloom.add('apple')}") # 可能是 True (新添加)
print(f"Add 'banana': {redis_bloom.add('banana')}") # 可能是 True
print(f"Add 'apple' again: {redis_bloom.add('apple')}") # 可能是 False (可能已存在)
# 检查元素
print(f"Contains 'apple': {redis_bloom.contains('apple')}") # True
print(f"Contains 'orange': {redis_bloom.contains('orange')}") # False
# 批量操作
items = ['grape', 'kiwi', 'banana']
add_results = redis_bloom.add_multi(items)
print(f"Batch add results: {add_results}")
check_items = ['apple', 'kiwi', 'mango']
exist_results = redis_bloom.contains_multi(check_items)
print(f"Batch check results: {exist_results}")
# 获取信息
info = redis_bloom.info()
print(f"Filter Info: Capacity={info.get('capacity')}, Size={info.get('size')}, Hashes={info.get('numHashes')}")
7. Python实现基于Redis (5.0以下版本) 的布隆过滤器
对于旧版Redis,没有原生命令,但可以利用Redis的 SETBIT
和 GETBIT
命令,在Python端实现哈希计算,手动模拟一个布隆过滤器,适用于任何版本。
🧰 环境要求
-
Redis Server (任何版本,支持
SETBIT
/GETBIT
即可)。使用Docker部署:docker run -p 6379:6379 --name redis-old redis:4.0-alpine
-
Python 3.6+
-
Redis Python客户端:
pip install redis
🖥️ 代码实现
# bloom_redis_without_bloom.py
import math
import hashlib
import redis
from typing import Any, List
class RedisLegacyBloomFilter:
"""
用于 Redis 5.0 以下版本的手动实现布隆过滤器。
在Python端计算哈希和位偏移,使用Redis的位图(bitmap)存储。
"""
def __init__(self, redis_client: redis.Redis, filter_key: str, capacity: int, error_rate: float = 0.01):
"""
初始化旧版Redis布隆过滤器。
Args:
redis_client (redis.Redis): 已连接的Redis客户端实例。
filter_key (str): 用于存储位图的Redis键名。
capacity (int): 预期容量。
error_rate (float): 可接受的误判率。
"""
self.client = redis_client
self.filter_key = filter_key
self.capacity = capacity
self.error_rate = error_rate
# 计算参数
self.m = self._calculate_m(self.capacity, self.error_rate) # 位数组大小
self.k = self._calculate_k(self.m, self.capacity) # 哈希函数数量
print(f"Manual Redis Bloom Filter params: m={self.m}, k={self.k}")
def _calculate_m(self, n: int, p: float) -> int:
m = - (n * math.log(p)) / (math.log(2) ** 2)
return int(m)
def _calculate_k(self, m: int, n: int) -> int:
k = (m / n) * math.log(2)
return max(1, int(round(k)))
def _get_offsets(self, item: Any) -> List[int]:
"""
计算一个元素对应的k个位偏移量。
"""
offsets = []
for i in range(self.k):
# 使用不同的种子(索引)和哈希算法(SHA256)生成哈希值
item_str = str(item).encode('utf-8')
seed = str(i).encode('utf-8')
hash_obj = hashlib.sha256(item_str + seed)
hash_hex = hash_obj.hexdigest()
hash_int = int(hash_hex, 16) % self.m
offsets.append(hash_int)
return offsets
def add(self, item: Any) -> None:
"""
添加元素到布隆过滤器。
"""
offsets = self._get_offsets(item)
pipeline = self.client.pipeline()
for offset in offsets:
pipeline.setbit(self.filter_key, offset, 1)
pipeline.execute()
def contains(self, item: Any) -> bool:
"""
检查元素是否可能在布隆过滤器中。
"""
offsets = self._get_offsets(item)
pipeline = self.client.pipeline()
for offset in offsets:
pipeline.getbit(self.filter_key, offset)
# 结果是一个列表,如 [1, 1, 0]
results = pipeline.execute()
# 如果所有位都是1,返回True;否则返回False
return all(results)
def __contains__(self, item: Any) -> bool:
return self.contains(item)
def clear(self) -> None:
"""删除布隆过滤器(清空位图)"""
self.client.delete(self.filter_key)
# 示例用法
if __name__ == "__main__":
# 连接Redis
r_client = redis.Redis(host='localhost', port=6379, db=0)
filter_key = "my_legacy_bloom_filter"
capacity = 5000 # 容量不宜设置过大,否则Redis位图操作和网络传输可能成为瓶颈
error_rate = 0.02
legacy_bloom = RedisLegacyBloomFilter(r_client, filter_key, capacity, error_rate)
# 添加元素
legacy_bloom.add("user:1001")
legacy_bloom.add("user:1002")
# 检查元素
print(f"Contains 'user:1001': {legacy_bloom.contains('user:1001')}")
print(f"Contains 'user:1003': {legacy_bloom.contains('user:1003')}")
# 使用 `in` 语法
print(f"'user:1002' in filter? {'user:1002' in legacy_bloom}")
# 清理(可选)
# legacy_bloom.clear()
说明:
- 这个实现性能较低,因为每个
add
或contains
操作都需要进行k
次Redis往返(使用pipeline后减少到一次),并且哈希计算在Python端进行。 - 位图存储在Redis的一个普通字符串键中。巨大的位图(大的
m
)可能会消耗大量内存,并且Redis需要处理大字符串。 - 仅推荐在无法使用RedisBloom模块时使用此方法。
8. 自动版本检测与方案选择
理论上可以实现自动检测Redis版本并选择对应的实现方案。
✅ 实现思路
- 连接Redis后,使用
INFO SERVER
命令获取Redis版本信息。 - 解析版本字符串(例如
"7.2.4"
)。 - 如果主版本号 >= 5,并且检测到
redisbloom
模块已加载(可用MODULE LIST
命令检查),则优先使用基于RedisBloom
的方案 (RedisBloomFilter
)。 - 如果版本号 < 5,或者版本号 >=5 但未找到
redisbloom
模块,则回退到使用手动实现的方案 (RedisLegacyBloomFilter
)。
🖥️ 封装代码示例
# auto_select_bloom.py
import redis
from redisbloom.client import Client
import re
class AutoRedisBloomFilter:
"""
自动根据Redis版本和模块支持选择布隆过滤器实现。
"""
def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0,
filter_name: str = "auto_bloom_filter", capacity: int = 10000, error_rate: float = 0.01):
"""
初始化自动布隆过滤器。
Args:
host, port, db: Redis连接参数。
filter_name: 布隆过滤器的键名。
capacity: 预期容量。
error_rate: 可接受的误判率。
"""
self.host = host
self.port = port
self.db = db
self.filter_name = filter_name
self.capacity = capacity
self.error_rate = error_rate
# 首先创建一个标准Redis连接来获取信息
self._standard_client = redis.Redis(host=host, port=port, db=db)
self._bloom_impl = self._select_implementation()
def _select_implementation(self):
"""
选择布隆过滤器的实现。
Returns:
选择的实现类实例 (RedisBloomFilter 或 RedisLegacyBloomFilter)。
"""
# 获取Redis服务器信息并解析版本
server_info = self._standard_client.info(section='server')
redis_version_str = server_info.get('redis_version', '0.0.0')
# 使用正则表达式解析版本号,例如 "7.2.4" -> (7, 2, 4)
version_match = re.match(r'(\d+)\.(\d+)\.(\d+)', redis_version_str)
if version_match:
major_version = int(version_match.group(1))
minor_version = int(version_match.group(2))
else:
major_version = 0
# 检查 RedisBloom 模块是否加载
modules_list = self._standard_client.execute_command('MODULE LIST')
module_loaded = False
if modules_list:
for module in modules_list:
# module 是字典,例如 {b'name': b'bf', b'ver': b'20405', ...}
if module.get(b'name') == b'bf':
module_loaded = True
break
print(f"Redis Version: {redis_version_str}, Major: {major_version}, RedisBloom Loaded: {module_loaded}")
# 决策逻辑
if major_version >= 5 and module_loaded:
print("Using RedisBloom implementation.")
# 创建 RedisBloom 的 Client
rb_client = Client(host=self.host, port=self.port, db=self.db)
return RedisBloomFilter(rb_client, self.filter_name, self.capacity, self.error_rate)
else:
print("Using Legacy (manual) implementation.")
# 使用标准Redis客户端
return RedisLegacyBloomFilter(self._standard_client, self.filter_name, self.capacity, self.error_rate)
# 代理方法,将 add, contains 等调用委托给选择的实现
def add(self, item):
return self._bloom_impl.add(item)
def contains(self, item):
return self._bloom_impl.contains(item)
def __contains__(self, item):
return self.contains(item)
# ... 其他需要代理的方法 ...
# 示例使用 AutoRedisBloomFilter
if __name__ == "__main__":
auto_filter = AutoRedisBloomFilter(host='localhost', port=6379, db=0,
filter_name="my_auto_filter", capacity=5000, error_rate=0.01)
auto_filter.add("test_item_auto")
print(f"Contains 'test_item_auto': {auto_filter.contains('test_item_auto')}")
⚠️ 注意事项
- 版本检测逻辑:需要可靠地解析Redis版本字符串。
- 模块检测:
MODULE LIST
命令需要Redis 4.0+。对于更老的版本,可能无法检测模块,只能回退到手动实现。 - 接口一致性:两种实现的API(如方法的返回值、有无某些方法)可能略有不同,需要在代理层(
AutoRedisBloomFilter
)妥善处理这些差异。 - 错误处理:网络错误、Redis连接问题等需要统一处理。
- 复杂性:引入了一层间接性,增加了复杂性。如果明确知道环境,直接使用对应的实现可能更简单清晰。
结论:从技术上讲,可以实现自动切换。但对于生产环境,如果环境可控,更常见的做法是直接部署RedisBloom模块并使用对应的客户端,避免不必要的复杂性。自动切换方案更适合需要兼容多种未知部署环境的库或工具。
9.性能测试脚本
# bloom_filter_test.py
import time
import random
import string
import redis
from redisbloom.client import Client
import numpy as np
from pybloom_live import BloomFilter, ScalableBloomFilter
import matplotlib.pyplot as plt
from utils.bloom_local_byself import LocalBloomFilter
from utils.bloom_redis_with_bloom import RedisBloomFilter
from utils.bloom_redis_without_bloom import RedisLegacyBloomFilter
def generate_random_string(length=10):
"""生成指定长度的随机字符串"""
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
def test_bloom_filter_performance(bloom_filter, sample_size=10000, test_size=2000):
"""
全面测试布隆过滤器的性能和误判率
Args:
bloom_filter: 要测试的布隆过滤器实例(必须实现 add 和 contains 方法)
sample_size (int): 要插入的样本数量
test_size (int): 用于测试误判率的、肯定不存在的样本数量
"""
print(f"开始测试布隆过滤器性能...")
print(f"过滤器参数: {bloom_filter}")
print(f"样本大小: {sample_size}, 测试大小: {test_size}")
print("-" * 60)
# 1. 生成测试数据
print("生成测试数据...")
existing_items = [generate_random_string(12) for _ in range(sample_size)]
non_existing_items = [generate_random_string(12) for _ in range(test_size)]
# 确保两组数据没有交集
while set(existing_items) & set(non_existing_items):
non_existing_items = [generate_random_string(12) for _ in range(test_size)]
# 2. 测试插入性能
print("测试插入性能...")
insert_times = []
start_total_time = time.time()
for item in existing_items:
start_time = time.perf_counter()
bloom_filter.add(item)
end_time = time.perf_counter()
insert_times.append((end_time - start_time) * 1000) # 转换为毫秒
total_insert_time = time.time() - start_total_time
avg_insert_time = np.mean(insert_times)
print(f"总插入时间: {total_insert_time:.4f} 秒")
print(f"平均每次插入时间: {avg_insert_time:.6f} 毫秒")
print(f"总插入吞吐量: {sample_size / total_insert_time:.2f} 操作/秒")
# 3. 测试查询性能(查询已存在的元素)
print("\n测试查询性能(存在的元素)...")
query_existing_times = []
start_total_time = time.time()
for item in existing_items[:test_size]: # 只测试前 test_size 个
start_time = time.perf_counter()
method = getattr(bloom_filter, 'contains', None)
if method:
result = bloom_filter.contains(item)
else:
result = item in bloom_filter # 使用 in 操作符而不是 contains 方法
end_time = time.perf_counter()
query_existing_times.append((end_time - start_time) * 1000) # 转换为毫秒
assert result, f"存在的元素 {item} 应该被找到!"
total_query_existing_time = time.time() - start_total_time
avg_query_existing_time = np.mean(query_existing_times)
print(f"总查询时间(存在元素): {total_query_existing_time:.4f} 秒")
print(f"平均每次查询时间(存在元素): {avg_query_existing_time:.6f} 毫秒")
print(f"查询吞吐量(存在元素): {test_size / total_query_existing_time:.2f} 操作/秒")
# 4. 测试查询性能(查询不存在的元素)
print("\n测试查询性能(不存在的元素)...")
query_nonexisting_times = []
false_positives = 0
start_total_time = time.time()
for item in non_existing_items:
start_time = time.perf_counter()
method = getattr(bloom_filter, 'contains', None)
if method:
result = bloom_filter.contains(item)
else:
result = item in bloom_filter # 使用 in 操作符而不是 contains 方法
end_time = time.perf_counter()
query_nonexisting_times.append((end_time - start_time) * 1000) # 转换为毫秒
if result:
false_positives += 1
total_query_nonexisting_time = time.time() - start_total_time
avg_query_nonexisting_time = np.mean(query_nonexisting_times)
false_positive_rate = false_positives / test_size
try:
print(f"总查询时间(不存在元素): {total_query_nonexisting_time:.4f} 秒")
print(f"平均每次查询时间(不存在元素): {avg_query_nonexisting_time:.6f} 毫秒")
print(f"查询吞吐量(不存在元素): {test_size / total_query_nonexisting_time:.2f} 操作/秒")
except ZeroDivisionError:
print("Too fast!!! Can not calculate the cost time.")
print(f"误判数量: {false_positives}/{test_size}")
print(f"实际误判率: {false_positive_rate:.6f} ({false_positive_rate*100:.4f}%)")
# 5. 返回测试结果
return {
"sample_size": sample_size,
"test_size": test_size,
"total_insert_time": total_insert_time,
"avg_insert_time": avg_insert_time,
"insert_throughput": sample_size / total_insert_time,
"total_query_existing_time": total_query_existing_time,
"avg_query_existing_time": avg_query_existing_time,
"query_existing_throughput": test_size / total_query_existing_time,
"total_query_nonexisting_time": total_query_nonexisting_time,
"avg_query_nonexisting_time": avg_query_nonexisting_time,
"query_nonexisting_throughput": test_size / total_query_nonexisting_time,
"false_positives": false_positives,
"false_positive_rate": false_positive_rate,
"insert_times": insert_times,
"query_existing_times": query_existing_times,
"query_nonexisting_times": query_nonexisting_times
}
def plot_performance(results, bloom_filter_name):
"""绘制性能测试图表"""
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle(f'Bloom Filter Performance: {bloom_filter_name}', fontsize=16)
# 1. 插入时间分布
ax1.hist(results['insert_times'], bins=50, alpha=0.7, color='blue', edgecolor='black')
ax1.set_title('Insert Time Distribution')
ax1.set_xlabel('Time (milliseconds)')
ax1.set_ylabel('Frequency')
ax1.axvline(np.mean(results['insert_times']), color='red', linestyle='dashed', linewidth=1, label=f'Mean: {np.mean(results["insert_times"]):.6f} ms')
ax1.legend()
# 2. 查询时间分布(存在元素)
ax2.hist(results['query_existing_times'], bins=50, alpha=0.7, color='green', edgecolor='black')
ax2.set_title('Query Time Distribution (Existing Items)')
ax2.set_xlabel('Time (milliseconds)')
ax2.set_ylabel('Frequency')
ax2.axvline(np.mean(results['query_existing_times']), color='red', linestyle='dashed', linewidth=1, label=f'Mean: {np.mean(results["query_existing_times"]):.6f} ms')
ax2.legend()
# 3. 查询时间分布(不存在元素)
ax3.hist(results['query_nonexisting_times'], bins=50, alpha=0.7, color='orange', edgecolor='black')
ax3.set_title('Query Time Distribution (Non-existing Items)')
ax3.set_xlabel('Time (milliseconds)')
ax3.set_ylabel('Frequency')
ax3.axvline(np.mean(results['query_nonexisting_times']), color='red', linestyle='dashed', linewidth=1, label=f'Mean: {np.mean(results["query_nonexisting_times"]):.6f} ms')
ax3.legend()
# 4. 性能指标对比
metrics = ['Insert', 'Query(Exist)', 'Query(Non-exist)']
times = [results['avg_insert_time'], results['avg_query_existing_time'], results['avg_query_nonexisting_time']]
throughputs = [results['insert_throughput'], results['query_existing_throughput'], results['query_nonexisting_throughput']]
ax4_1 = ax4.twinx()
x = range(len(metrics))
width = 0.35
bars1 = ax4.bar(x, times, width, label='Avg Time (ms)', color='lightblue', edgecolor='black')
bars2 = ax4_1.bar([i + width for i in x], throughputs, width, label='Throughput (ops/s)', color='lightcoral', edgecolor='black')
ax4.set_xlabel('Operation Type')
ax4.set_ylabel('Average Time (milliseconds)', color='lightblue')
ax4_1.set_ylabel('Throughput (operations/second)', color='lightcoral')
ax4.set_title('Performance Comparison')
ax4.set_xticks([i + width/2 for i in x])
ax4.set_xticklabels(metrics)
# 添加数值标签
for i, v in enumerate(times):
ax4.text(i, v + 0.001, f'{v:.4f}', ha='center', va='bottom')
for i, v in enumerate(throughputs):
ax4_1.text(i + width, v + 5, f'{v:.0f}', ha='center', va='bottom')
ax4.legend(loc='upper left')
ax4_1.legend(loc='upper right')
plt.tight_layout()
plt.savefig(f'bloom_filter_performance_{bloom_filter_name}.png', dpi=300, bbox_inches='tight')
# plt.show()
def print_summary(results, bloom_filter_name):
"""打印测试结果摘要"""
print("\n" + "="*60)
print(f"布隆过滤器性能测试摘要 - {bloom_filter_name}")
print("="*60)
print(f"样本大小: {results['sample_size']}")
print(f"测试大小: {results['test_size']}")
print(f"误判率: {results['false_positive_rate']*100:.6f}% ({results['false_positives']}/{results['test_size']})")
print(f"平均插入时间: {results['avg_insert_time']:.6f} 毫秒")
print(f"平均查询时间(存在): {results['avg_query_existing_time']:.6f} 毫秒")
print(f"平均查询时间(不存在): {results['avg_query_nonexisting_time']:.6f} 毫秒")
print(f"插入吞吐量: {results['insert_throughput']:.2f} 操作/秒")
print(f"查询吞吐量(存在): {results['query_existing_throughput']:.2f} 操作/秒")
print(f"查询吞吐量(不存在): {results['query_nonexisting_throughput']:.2f} 操作/秒")
print("="*60)
# 使用示例
if __name__ == "__main__":
# 创建布隆过滤器实例
capacity = 100000
error_rate = 0.001
# 没有过滤器模块的Redis服务配置
redis_without_bloom_configs = {
"host": '192.168.48.131',
"port": 6380,
"db": 0
}
# 有过滤器模块的Redis服务配置
redis_with_bloom_configs = {
"host": '192.168.48.131',
"port": 6379,
"db": 0
}
local_bloom_cls = [LocalBloomFilter, BloomFilter, ScalableBloomFilter]
redis_bloom_cls = [RedisLegacyBloomFilter, RedisBloomFilter]
bloom_filter_instances = {}
for cls in local_bloom_cls:
bloom_filter_instances[str(cls)] = cls(capacity, error_rate)
r_client = redis.Redis(
host=redis_without_bloom_configs['host'],
port=redis_without_bloom_configs['port'],
db=redis_without_bloom_configs['db']
)
bloom_filter_instances[str(RedisLegacyBloomFilter)] = RedisLegacyBloomFilter(r_client, "legacy_bloom_filter", capacity, error_rate)
rb_client = Client(
host=redis_with_bloom_configs['host'],
port=redis_with_bloom_configs['port'],
db=redis_with_bloom_configs['db']
)
filter_name = "redis_bloom_filter"
bloom_filter_instances[str(RedisBloomFilter)] = RedisBloomFilter(rb_client, "redis_bloom_filter", capacity, error_rate)
for k, v in bloom_filter_instances.items():
# 运行性能测试
results = test_bloom_filter_performance(v, sample_size=100000, test_size=40000)
# 打印摘要和图表
print_summary(results, k)
plot_performance(results, k.split(".")[-1][0:-2])
需要手动配置本地的
Redis
服务的地址与端口等数据,具体变量名为:
redis_without_bloom_configs
redis_with_bloom_configs
10.性能测试结果
Manual Redis Bloom Filter params: m=1437758, k=10
开始测试布隆过滤器性能...
过滤器参数: BloomFilter(位数组大小=1437758, 哈希函数数=10, 已添加元素≈0, 当前理论误判率≈0.000000)
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 0.2611 秒
平均每次插入时间: 0.002455 毫秒
总插入吞吐量: 382990.05 操作/秒
测试查询性能(存在的元素)...
总查询时间(存在元素): 0.1109 秒
平均每次查询时间(存在元素): 0.002614 毫秒
查询吞吐量(存在元素): 360814.31 操作/秒
测试查询性能(不存在的元素)...
总查询时间(不存在元素): 0.0967 秒
平均每次查询时间(不存在元素): 0.002260 毫秒
查询吞吐量(不存在元素): 413822.08 操作/秒
误判数量: 30/40000
实际误判率: 0.000750 (0.0750%)
============================================================
布隆过滤器性能测试摘要 - <class 'utils.bloom_local_byself.LocalBloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.075000% (30/40000)
平均插入时间: 0.002455 毫秒
平均查询时间(存在): 0.002614 毫秒
平均查询时间(不存在): 0.002260 毫秒
插入吞吐量: 382990.05 操作/秒
查询吞吐量(存在): 360814.31 操作/秒
查询吞吐量(不存在): 413822.08 操作/秒
============================================================
开始测试布隆过滤器性能...
过滤器参数: <pybloom_live.pybloom.BloomFilter object at 0x0000026A90217CA0>
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 0.3619 秒
平均每次插入时间: 0.003461 毫秒
总插入吞吐量: 276344.30 操作/秒
测试查询性能(存在的元素)...
总查询时间(存在元素): 0.1320 秒
平均每次查询时间(存在元素): 0.003122 毫秒
查询吞吐量(存在元素): 303089.50 操作/秒
测试查询性能(不存在的元素)...
总查询时间(不存在元素): 0.0850 秒
平均每次查询时间(不存在元素): 0.001973 毫秒
查询吞吐量(不存在元素): 470420.45 操作/秒
误判数量: 38/40000
实际误判率: 0.000950 (0.0950%)
============================================================
布隆过滤器性能测试摘要 - <class 'pybloom_live.pybloom.BloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.095000% (38/40000)
平均插入时间: 0.003461 毫秒
平均查询时间(存在): 0.003122 毫秒
平均查询时间(不存在): 0.001973 毫秒
插入吞吐量: 276344.30 操作/秒
查询吞吐量(存在): 303089.50 操作/秒
查询吞吐量(不存在): 470420.45 操作/秒
============================================================
开始测试布隆过滤器性能...
过滤器参数: <pybloom_live.pybloom.ScalableBloomFilter object at 0x0000026AB21F5C60>
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 0.5860 秒
平均每次插入时间: 0.005695 毫秒
总插入吞吐量: 170644.31 操作/秒
测试查询性能(存在的元素)...
总查询时间(存在元素): 0.1462 秒
平均每次查询时间(存在元素): 0.003485 毫秒
查询吞吐量(存在元素): 273591.23 操作/秒
测试查询性能(不存在的元素)...
总查询时间(不存在元素): 0.0981 秒
平均每次查询时间(不存在元素): 0.002287 毫秒
查询吞吐量(不存在元素): 407684.00 操作/秒
误判数量: 31/40000
实际误判率: 0.000775 (0.0775%)
============================================================
布隆过滤器性能测试摘要 - <class 'pybloom_live.pybloom.ScalableBloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.077500% (31/40000)
平均插入时间: 0.005695 毫秒
平均查询时间(存在): 0.003485 毫秒
平均查询时间(不存在): 0.002287 毫秒
插入吞吐量: 170644.31 操作/秒
查询吞吐量(存在): 273591.23 操作/秒
查询吞吐量(不存在): 407684.00 操作/秒
============================================================
开始测试布隆过滤器性能...
过滤器参数: <utils.bloom_redis_without_bloom.RedisLegacyBloomFilter object at 0x0000026AB21F5D80>
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 56.5457 秒
平均每次插入时间: 0.564914 毫秒
总插入吞吐量: 1768.48 操作/秒
测试查询性能(存在的元素)...
总查询时间(存在元素): 23.1157 秒
平均每次查询时间(存在元素): 0.577352 毫秒
查询吞吐量(存在元素): 1730.43 操作/秒
测试查询性能(不存在的元素)...
总查询时间(不存在元素): 23.2851 秒
平均每次查询时间(不存在元素): 0.581571 毫秒
查询吞吐量(不存在元素): 1717.84 操作/秒
误判数量: 51/40000
实际误判率: 0.001275 (0.1275%)
============================================================
布隆过滤器性能测试摘要 - <class 'utils.bloom_redis_without_bloom.RedisLegacyBloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.127500% (51/40000)
平均插入时间: 0.564914 毫秒
平均查询时间(存在): 0.577352 毫秒
平均查询时间(不存在): 0.581571 毫秒
插入吞吐量: 1768.48 操作/秒
查询吞吐量(存在): 1730.43 操作/秒
查询吞吐量(不存在): 1717.84 操作/秒
============================================================
开始测试布隆过滤器性能...
过滤器参数: <utils.bloom_redis_with_bloom.RedisBloomFilter object at 0x0000026AB21F5E40>
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 49.4930 秒
平均每次插入时间: 0.494393 毫秒
总插入吞吐量: 2020.49 操作/秒
测试查询性能(存在的元素)...
总查询时间(存在元素): 19.8955 秒
平均每次查询时间(存在元素): 0.496794 毫秒
查询吞吐量(存在元素): 2010.50 操作/秒
测试查询性能(不存在的元素)...
总查询时间(不存在元素): 19.3901 秒
查询吞吐量(不存在元素): 2062.91 操作/秒
误判数量: 12/40000
实际误判率: 0.000300 (0.0300%)
============================================================
布隆过滤器性能测试摘要 - <class 'utils.bloom_redis_with_bloom.RedisBloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.030000% (12/40000)
平均插入时间: 0.494393 毫秒
平均查询时间(存在): 0.496794 毫秒
插入吞吐量: 2020.49 操作/秒
查询吞吐量(存在): 2010.50 操作/秒
查询吞吐量(不存在): 2062.91 操作/秒
============================================================
11.代码下载
点击下载:BloomFilters.7z