🧠 布隆过滤器详解与Python实现

1. 什么是布隆过滤器?

布隆过滤器(Bloom Filter)是由 Burton Howard Bloom 在 1970 年提出的一种空间效率极高的概率型数据结构。它用于判断一个元素是否在一个集合中。这种判断是概率性的:它可能会误判(False Positive,即告诉你某个元素存在而实际上并不存在),但它绝不会漏判(False Negative,即如果它说某个元素不存在,那就一定不存在)。

它的核心是一个二进制位数组(Bit Array)(初始值全为0)和一系列哈希函数。由于其概率性和空间效率高的特点,它非常适合那些可以接受一定误判率,但对内存和查询速度有高要求的场景。

2. 为什么要使用布隆过滤器及其使用场景

✨ 优点与价值

布隆过滤器在空间效率和查询时间上通常都远超其他算法,其优点包括:

  • 极高的空间效率:不需要存储元素本身,只需一个位数组,占用内存极少。
  • 极快的查询速度:查询时间复杂度为 O(k),k 是哈希函数个数,通常很小。
  • 保密性强:由于不存储原始数据,在某些场景下有利于数据安全。

🎯 常见应用场景

布隆过滤器非常适合以下场景:

  • 缓存穿透防护:在查询缓存之前,先用布隆过滤器判断数据是否存在,避免大量请求直接访问数据库(查询根本不存在的key)。
  • 网页爬虫 URL 去重:快速判断一个 URL 是否已被爬取,避免重复工作和循环抓取。
  • 垃圾邮件过滤:判断发件人地址是否在垃圾邮件列表中。
  • 大规模数据集快速查找:如判断用户ID是否属于某个特定群体,尽管可能存在误判,但能快速排除大量不在集合中的元素。

⚠️ 缺点与注意事项

布隆过滤器也存在一些缺点:

  • 存在误判率:这是其概率性本质决定的。误判率会随着元素的增加而上升,但可以通过调整参数来控制。
  • 不支持删除操作:传统布隆过滤器很难安全地删除元素,因为删除一个元素(将其对应位置0)可能会影响其他映射到相同位的元素。虽然有计数布隆过滤器(Counting Bloom Filter)(用计数器代替位)的变体支持删除,但会增加空间开销。

3. 布隆过滤器原理详解

布隆过滤器的核心是一个很长的二进制位数组(Bit Array) 和一系列随机映射函数(哈希函数)

🔧 工作流程

  1. 初始化:创建一个长度为 m 的位数组,所有位初始化为 0。同时选择 k 个独立且均匀分布的哈希函数。
  2. 添加元素:当要添加一个元素时,用 k 个哈希函数分别计算该元素的哈希值,得到 k 个位置(索引)。将位数组中这些位置的值都置为 1。
  3. 查询元素:当要查询一个元素是否存在时,同样用这 k 个哈希函数计算哈希值,得到 k 个位置: 如果这些位置有任何一位为 0,则该元素肯定不存在于集合中。 如果这些位置全部为 1,则该元素可能存在于集合中(由于哈希冲突,存在误判的可能)。
bloom.svg

📊 参数选择与误判率

布隆过滤器的误判率主要受三个因素影响:

  • 位数组大小 (m)m 越大,误判率越低,但消耗的内存也越大。
  • 哈希函数数量 (k)k 过多或过少都会导致误判率升高。
  • 已插入元素数量 (n):插入的元素越多,位数组中1的比例越高,误判率也随之升高。

可以根据预期元素数量 n 和期望的误判率 p 来估算最优的 mk

calculate_method.png

4. Python实现本地缓存版布隆过滤器(使用mmh3和bitarray)

使用 mmh3bitarray 库来实现功能相对完善的布隆过滤器。

🧰 环境要求

  • Python 3.6+

  • 安装 mmh3bitarray 库:

    pip install mmh3 bitarray
    

🖥️ 代码实现

# bloom_local_byself.py
import math
import mmh3
from bitarray import bitarray
from typing import Any, List
import array

class LocalBloomFilter:
    """
    使用 mmh3 和 bitarray 实现的布隆过滤器。
    """

    def __init__(self, items_count: int, false_positive_prob: float = 0.01):
        """
        初始化布隆过滤器。

        Args:
            items_count (int): 预期要插入的元素数量。
            false_positive_prob (float): 可接受的误判率(0到1之间)。默认 0.01 (1%)。
        """
        if not 0 < false_positive_prob < 1:
            raise ValueError("false_positive_prob 必须在0和1之间")
        if items_count <= 0:
            raise ValueError("items_count 必须大于0")

        self.n = items_count
        self.p = false_positive_prob

        # 计算最优的位数组大小和哈希函数数量
        self.m = self._calculate_m(self.n, self.p)
        self.k = self._calculate_k(self.m, self.n)

        # 初始化位数组
        self.bit_array = bitarray(self.m)
        self.bit_array.setall(0)
        self._elements_added = 0

    def _calculate_m(self, n: int, p: float) -> int:
        """计算位数组大小 m"""
        m = - (n * math.log(p)) / (math.log(2) ** 2)
        return int(m)

    def _calculate_k(self, m: int, n: int) -> int:
        """计算哈希函数数量 k"""
        k = (m / n) * math.log(2)
        return max(1, int(round(k)))  # 至少为1

    def _get_hash_indexes(self, item: Any) -> List[int]:
        """
        计算一个元素对应的k个位索引。
        使用 mmh3 哈希函数,通过不同的种子模拟多个哈希函数。
        """
        indexes = []
        for i in range(self.k):
            # 使用不同的种子(i)来模拟不同的哈希函数
            hash_val = mmh3.hash(str(item), i) % self.m
            indexes.append(hash_val)
        return indexes

    def add(self, item: Any) -> None:
        """
        将一个元素添加到布隆过滤器中。
        """
        indexes = self._get_hash_indexes(item)
        for index in indexes:
            self.bit_array[index] = 1
        self._elements_added += 1

    def contains(self, item: Any) -> bool:
        """
        检查一个元素是否可能在布隆过滤器中。
        返回:
            True: 元素可能存在(存在误判可能)。
            False: 元素一定不存在。
        """
        indexes = self._get_hash_indexes(item)
        for index in indexes:
            if self.bit_array[index] == 0:
                return False
        return True

    def __contains__(self, item: Any) -> bool:
        """支持 'item in filter' 语法"""
        return self.contains(item)

    @property
    def size(self) -> int:
        """返回位数组大小"""
        return self.m

    @property
    def hash_count(self) -> int:
        """返回哈希函数数量"""
        return self.k

    @property
    def elements_added(self) -> int:
        """返回已添加的元素数量"""
        return self._elements_added

    @property
    def current_false_positive_prob(self) -> float:
        """计算当前的误判率(理论估算值)"""
        # 公式: (1 - e^(-k * n / m)) ^ k
        n = self._elements_added
        m = self.m
        k = self.k
        return pow(1 - math.exp(-k * n / m), k)

    def __str__(self) -> str:
        """返回布隆过滤器的一些基本信息"""
        return (f"BloomFilter(位数组大小={self.m}, "
                f"哈希函数数={self.k}, "
                f"已添加元素≈{self._elements_added}, "
                f"当前理论误判率≈{self.current_false_positive_prob:.6f})")


# 示例用法和测试
if __name__ == "__main__":
    # 创建一个布隆过滤器,预期插入1000个元素,目标误判率1%
    bloom_filter = LocalBloomFilter(1000, 0.01)

    print(bloom_filter)
    print("-" * 50)

    # 添加一些元素
    items_to_add = ["apple", "banana", "cherry", "date", "elderberry"]
    for item in items_to_add:
        bloom_filter.add(item)
        print(f"添加了: '{item}'")

    print("-" * 50)
    print(f"添加后: {bloom_filter}")

    # 检查元素是否存在
    test_items = ["apple", "banana", "fig", "grape"]
    for item in test_items:
        result = bloom_filter.contains(item)
        status = "可能存在" if result else "肯定不存在"
        print(f"检查 '{item}': {status}")

    # 使用 `in` 语法
    print("\n使用 'in' 语法检查:")
    print(f"'cherry' in filter? {'cherry' in bloom_filter}")
    print(f"'watermelon' in filter? {'watermelon' in bloom_filter}")

说明

  • 这个实现使用 mmh3 作为哈希函数,bitarray 来处理位操作,性能比纯Python列表实现要好很多。
  • 哈希函数通过向 mmh3.hash 传递不同的种子(i)来模拟多个哈希函数。
  • 类提供了计算最优 mk 的方法,并可以估算当前的误判率。

5. Python借助第三方库pybloom_live实现布隆过滤器

使用 pybloom_live 可以轻松实现强大且高效的布隆过滤器。

🧰 环境要求

  • Python 3.6+

  • 安装 pybloom_live 库:

    pip install pybloom_live
    

🖥️ 代码实现

pybloom_live 提供了两种布隆过滤器:BloomFilter(固定大小)和 ScalableBloomFilter(可扩展,能自动应对超出预期数量的元素,但误判率可能会上升)。

# bloom_local_bythird.py
from pybloom_live import BloomFilter, ScalableBloomFilter

# 1. 使用固定大小的 BloomFilter
# 参数:容量 capacity, 误判率 error_rate
bf = BloomFilter(capacity=10000, error_rate=0.001)
print(f"固定布隆过滤器:容量={bf.capacity}, 误判率={bf.error_rate}")

# 添加元素
bf.add("www.example.com")
bf.add("www.google.com")

# 检查元素
print(f"Contains 'www.example.com': {'www.example.com' in bf}")  # True
print(f"Contains 'www.github.com': {'www.github.com' in bf}")  # False (很可能)

# 当前元素数量 (近似值)
print(f"Approximate number of items: {len(bf)}")

# 2. 使用可扩展的 ScalableBloomFilter
# 参数:初始模式 mode (SMALL_SET_GROWTH or LARGE_SET_GROWTH), 初始误判率 error_rate
sbf = ScalableBloomFilter(initial_capacity=100, mode=ScalableBloomFilter.SMALL_SET_GROWTH, error_rate=0.001)

# 添加大量元素,超出初始容量也没关系
for i in range(1000):
    sbf.add(f"item_{i}")

# 检查元素
print(f"Contains 'item_999': {'item_999' in sbf}")  # True
print(f"Contains 'item_2000': {'item_2000' in sbf}")  # False
print(f"Approximate number of items in scalable filter: {len(sbf)}")

说明

  • pybloom_live 库经过优化,性能很好。
  • BloomFilter 在元素数量超过 capacity 后,误判率会急剧上升。
  • ScalableBloomFilter 通过使用多个过滤器层来自动扩展,更适合不确定最终元素数量的场景。

要注意,pybloom_live中的过滤器对象未实现contains方法,但支持in语法进行判断

6. Python实现基于Redis (5.0+) 的布隆过滤器

Redis 5.0及以上版本可以通过加载 RedisBloom 模块来获得原生的布隆过滤器支持,这是官方推荐的方案,性能和功能都最好。

🧰 环境要求

  • Redis Server 5.0+

  • RedisBloom 模块已安装并加载。使用Docker部署(直接部署带有过滤器模块的版本):

    docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest
    
  • Python 3.6+

  • Redis Python客户端redisbloom 库:

    pip install redis redisbloom
    

🖥️ 代码实现

# bloom_redis_with_bloom.py
from redisbloom.client import Client

class RedisBloomFilter:
    """
    使用 RedisBloom 模块的布隆过滤器封装。
    适用于 Redis 5.0+ 并已加载 RedisBloom 模块。
    """

    def __init__(self, redis_client: Client, filter_name: str, capacity: int, error_rate: float = 0.01):
        """
        初始化Redis布隆过滤器。

        Args:
            redis_client (Client): 已连接的RedisBloom客户端实例。
            filter_name (str): 布隆过滤器在Redis中的键名。
            capacity (int): 预期容量。
            error_rate (float): 可接受的误判率。
        """
        self.client = redis_client
        self.filter_name = filter_name
        self.capacity = capacity
        self.error_rate = error_rate

        # 显式创建过滤器。如果已存在,则忽略。
        # BF.RESERVE key error_rate capacity
        try:
            self.client.bfCreate(self.filter_name, self.error_rate, self.capacity)
        except Exception as e:
            # 常见的错误是过滤器已存在,这通常可以忽略
            if 'exists' not in str(e).lower():
                print(f"过滤器可能已存在或创建时出错(可忽略): {e}")

    def add(self, item) -> bool:
        """
        添加一个元素到布隆过滤器。
        Returns:
            bool: 如果元素是新添加的(可能之前不存在),返回True;如果可能已经存在,返回False。
                  注意:由于布隆过滤器的特性,返回False不一定代表之前一定存在。
        """
        # BF.ADD key item
        return self.client.bfAdd(self.filter_name, item)

    def add_multi(self, items: list) -> list:
        """
        批量添加元素。
        Returns:
            list: 每个元素的添加结果列表 (1表示新添加, 0表示可能已存在)。
        """
        # BF.MADD key item1 item2 ...
        return self.client.bfMAdd(self.filter_name, *items)

    def contains(self, item) -> bool:
        """
        检查元素是否可能在布隆过滤器中。
        Returns:
            bool: True表示可能存在,False表示肯定不存在。
        """
        # BF.EXISTS key item
        return self.client.bfExists(self.filter_name, item)

    def contains_multi(self, items: list) -> list:
        """
        批量检查元素。
        Returns:
            list: 每个元素的存在结果列表 (1表示可能存在, 0表示肯定不存在)。
        """
        # BF.MEXISTS key item1 item2 ...
        return self.client.bfMExists(self.filter_name, *items)

    def info(self) -> dict:
        """
        获取布隆过滤器的信息。
        Returns:
            dict: 包含容量、大小、哈希函数数量等信息的字典。
        """
        # BF.INFO key
        return self.client.bfInfo(self.filter_name)


# 示例用法
if __name__ == "__main__":
    import redis

    # 方式1:使用 redisbloom 的 Client
    rb_client = Client(host='localhost', port=6379, db=0)
    # 方式2:也可以使用标准的redis客户端,但命令需要稍作调整
    # standard_client = redis.Redis(host='localhost', port=6379, db=0)
    # rb_client = Client.from_redis(standard_client)

    # 创建过滤器实例
    filter_name = "my_redis_bloom_filter"
    capacity = 10000
    error_rate = 0.001

    redis_bloom = RedisBloomFilter(rb_client, filter_name, capacity, error_rate)

    # 添加元素
    print(f"Add 'apple': {redis_bloom.add('apple')}")  # 可能是 True (新添加)
    print(f"Add 'banana': {redis_bloom.add('banana')}") # 可能是 True
    print(f"Add 'apple' again: {redis_bloom.add('apple')}") # 可能是 False (可能已存在)

    # 检查元素
    print(f"Contains 'apple': {redis_bloom.contains('apple')}")    # True
    print(f"Contains 'orange': {redis_bloom.contains('orange')}")   # False

    # 批量操作
    items = ['grape', 'kiwi', 'banana']
    add_results = redis_bloom.add_multi(items)
    print(f"Batch add results: {add_results}")

    check_items = ['apple', 'kiwi', 'mango']
    exist_results = redis_bloom.contains_multi(check_items)
    print(f"Batch check results: {exist_results}")

    # 获取信息
    info = redis_bloom.info()
    print(f"Filter Info: Capacity={info.get('capacity')}, Size={info.get('size')}, Hashes={info.get('numHashes')}")

7. Python实现基于Redis (5.0以下版本) 的布隆过滤器

对于旧版Redis,没有原生命令,但可以利用Redis的 SETBITGETBIT 命令,在Python端实现哈希计算,手动模拟一个布隆过滤器,适用于任何版本。

🧰 环境要求

  • Redis Server (任何版本,支持 SETBIT/GETBIT 即可)。使用Docker部署:

    docker run -p 6379:6379 --name redis-old redis:4.0-alpine
    
  • Python 3.6+

  • Redis Python客户端

    pip install redis
    

🖥️ 代码实现

# bloom_redis_without_bloom.py
import math
import hashlib
import redis
from typing import Any, List

class RedisLegacyBloomFilter:
    """
    用于 Redis 5.0 以下版本的手动实现布隆过滤器。
    在Python端计算哈希和位偏移,使用Redis的位图(bitmap)存储。
    """

    def __init__(self, redis_client: redis.Redis, filter_key: str, capacity: int, error_rate: float = 0.01):
        """
        初始化旧版Redis布隆过滤器。

        Args:
            redis_client (redis.Redis): 已连接的Redis客户端实例。
            filter_key (str): 用于存储位图的Redis键名。
            capacity (int): 预期容量。
            error_rate (float): 可接受的误判率。
        """
        self.client = redis_client
        self.filter_key = filter_key
        self.capacity = capacity
        self.error_rate = error_rate

        # 计算参数
        self.m = self._calculate_m(self.capacity, self.error_rate)  # 位数组大小
        self.k = self._calculate_k(self.m, self.capacity)          # 哈希函数数量

        print(f"Manual Redis Bloom Filter params: m={self.m}, k={self.k}")

    def _calculate_m(self, n: int, p: float) -> int:
        m = - (n * math.log(p)) / (math.log(2) ** 2)
        return int(m)

    def _calculate_k(self, m: int, n: int) -> int:
        k = (m / n) * math.log(2)
        return max(1, int(round(k)))

    def _get_offsets(self, item: Any) -> List[int]:
        """
        计算一个元素对应的k个位偏移量。
        """
        offsets = []
        for i in range(self.k):
            # 使用不同的种子(索引)和哈希算法(SHA256)生成哈希值
            item_str = str(item).encode('utf-8')
            seed = str(i).encode('utf-8')
            hash_obj = hashlib.sha256(item_str + seed)
            hash_hex = hash_obj.hexdigest()
            hash_int = int(hash_hex, 16) % self.m
            offsets.append(hash_int)
        return offsets

    def add(self, item: Any) -> None:
        """
        添加元素到布隆过滤器。
        """
        offsets = self._get_offsets(item)
        pipeline = self.client.pipeline()
        for offset in offsets:
            pipeline.setbit(self.filter_key, offset, 1)
        pipeline.execute()

    def contains(self, item: Any) -> bool:
        """
        检查元素是否可能在布隆过滤器中。
        """
        offsets = self._get_offsets(item)
        pipeline = self.client.pipeline()
        for offset in offsets:
            pipeline.getbit(self.filter_key, offset)
        # 结果是一个列表,如 [1, 1, 0]
        results = pipeline.execute()
        # 如果所有位都是1,返回True;否则返回False
        return all(results)

    def __contains__(self, item: Any) -> bool:
        return self.contains(item)

    def clear(self) -> None:
        """删除布隆过滤器(清空位图)"""
        self.client.delete(self.filter_key)


# 示例用法
if __name__ == "__main__":
    # 连接Redis
    r_client = redis.Redis(host='localhost', port=6379, db=0)

    filter_key = "my_legacy_bloom_filter"
    capacity = 5000  # 容量不宜设置过大,否则Redis位图操作和网络传输可能成为瓶颈
    error_rate = 0.02

    legacy_bloom = RedisLegacyBloomFilter(r_client, filter_key, capacity, error_rate)

    # 添加元素
    legacy_bloom.add("user:1001")
    legacy_bloom.add("user:1002")

    # 检查元素
    print(f"Contains 'user:1001': {legacy_bloom.contains('user:1001')}")
    print(f"Contains 'user:1003': {legacy_bloom.contains('user:1003')}")

    # 使用 `in` 语法
    print(f"'user:1002' in filter? {'user:1002' in legacy_bloom}")

    # 清理(可选)
    # legacy_bloom.clear()

说明

  • 这个实现性能较低,因为每个 addcontains 操作都需要进行 k 次Redis往返(使用pipeline后减少到一次),并且哈希计算在Python端进行。
  • 位图存储在Redis的一个普通字符串键中。巨大的位图(大的 m)可能会消耗大量内存,并且Redis需要处理大字符串。
  • 仅推荐在无法使用RedisBloom模块时使用此方法

8. 自动版本检测与方案选择

理论上可以实现自动检测Redis版本并选择对应的实现方案。

✅ 实现思路

  1. 连接Redis后,使用 INFO SERVER 命令获取Redis版本信息。
  2. 解析版本字符串(例如 "7.2.4")。
  3. 如果主版本号 >= 5,并且检测到 redisbloom 模块已加载(可用 MODULE LIST 命令检查),则优先使用基于 RedisBloom 的方案 (RedisBloomFilter)。
  4. 如果版本号 < 5,或者版本号 >=5 但未找到 redisbloom 模块,则回退到使用手动实现的方案 (RedisLegacyBloomFilter)。

🖥️ 封装代码示例

# auto_select_bloom.py
import redis
from redisbloom.client import Client
import re

class AutoRedisBloomFilter:
    """
    自动根据Redis版本和模块支持选择布隆过滤器实现。
    """

    def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0,
                 filter_name: str = "auto_bloom_filter", capacity: int = 10000, error_rate: float = 0.01):
        """
        初始化自动布隆过滤器。

        Args:
            host, port, db: Redis连接参数。
            filter_name: 布隆过滤器的键名。
            capacity: 预期容量。
            error_rate: 可接受的误判率。
        """
        self.host = host
        self.port = port
        self.db = db
        self.filter_name = filter_name
        self.capacity = capacity
        self.error_rate = error_rate

        # 首先创建一个标准Redis连接来获取信息
        self._standard_client = redis.Redis(host=host, port=port, db=db)
        self._bloom_impl = self._select_implementation()

    def _select_implementation(self):
        """
        选择布隆过滤器的实现。
        Returns:
            选择的实现类实例 (RedisBloomFilter 或 RedisLegacyBloomFilter)。
        """
        # 获取Redis服务器信息并解析版本
        server_info = self._standard_client.info(section='server')
        redis_version_str = server_info.get('redis_version', '0.0.0')
        # 使用正则表达式解析版本号,例如 "7.2.4" -> (7, 2, 4)
        version_match = re.match(r'(\d+)\.(\d+)\.(\d+)', redis_version_str)
        if version_match:
            major_version = int(version_match.group(1))
            minor_version = int(version_match.group(2))
        else:
            major_version = 0

        # 检查 RedisBloom 模块是否加载
        modules_list = self._standard_client.execute_command('MODULE LIST')
        module_loaded = False
        if modules_list:
            for module in modules_list:
                # module 是字典,例如 {b'name': b'bf', b'ver': b'20405', ...}
                if module.get(b'name') == b'bf':
                    module_loaded = True
                    break

        print(f"Redis Version: {redis_version_str}, Major: {major_version}, RedisBloom Loaded: {module_loaded}")

        # 决策逻辑
        if major_version >= 5 and module_loaded:
            print("Using RedisBloom implementation.")
            # 创建 RedisBloom 的 Client
            rb_client = Client(host=self.host, port=self.port, db=self.db)
            return RedisBloomFilter(rb_client, self.filter_name, self.capacity, self.error_rate)
        else:
            print("Using Legacy (manual) implementation.")
            # 使用标准Redis客户端
            return RedisLegacyBloomFilter(self._standard_client, self.filter_name, self.capacity, self.error_rate)

    # 代理方法,将 add, contains 等调用委托给选择的实现
    def add(self, item):
        return self._bloom_impl.add(item)

    def contains(self, item):
        return self._bloom_impl.contains(item)

    def __contains__(self, item):
        return self.contains(item)

    # ... 其他需要代理的方法 ...


# 示例使用 AutoRedisBloomFilter
if __name__ == "__main__":
    auto_filter = AutoRedisBloomFilter(host='localhost', port=6379, db=0,
                                      filter_name="my_auto_filter", capacity=5000, error_rate=0.01)

    auto_filter.add("test_item_auto")
    print(f"Contains 'test_item_auto': {auto_filter.contains('test_item_auto')}")

⚠️ 注意事项

  • 版本检测逻辑:需要可靠地解析Redis版本字符串。
  • 模块检测MODULE LIST 命令需要Redis 4.0+。对于更老的版本,可能无法检测模块,只能回退到手动实现。
  • 接口一致性:两种实现的API(如方法的返回值、有无某些方法)可能略有不同,需要在代理层(AutoRedisBloomFilter)妥善处理这些差异。
  • 错误处理:网络错误、Redis连接问题等需要统一处理。
  • 复杂性:引入了一层间接性,增加了复杂性。如果明确知道环境,直接使用对应的实现可能更简单清晰。

结论:从技术上讲,可以实现自动切换。但对于生产环境,如果环境可控,更常见的做法是直接部署RedisBloom模块并使用对应的客户端,避免不必要的复杂性。自动切换方案更适合需要兼容多种未知部署环境的库或工具。

9.性能测试脚本

# bloom_filter_test.py
import time
import random
import string

import redis
from redisbloom.client import Client
import numpy as np
from pybloom_live import BloomFilter, ScalableBloomFilter
import matplotlib.pyplot as plt

from utils.bloom_local_byself import LocalBloomFilter
from utils.bloom_redis_with_bloom import RedisBloomFilter
from utils.bloom_redis_without_bloom import RedisLegacyBloomFilter

def generate_random_string(length=10):
    """生成指定长度的随机字符串"""
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(length))

def test_bloom_filter_performance(bloom_filter, sample_size=10000, test_size=2000):
    """
    全面测试布隆过滤器的性能和误判率

    Args:
        bloom_filter: 要测试的布隆过滤器实例(必须实现 add 和 contains 方法)
        sample_size (int): 要插入的样本数量
        test_size (int): 用于测试误判率的、肯定不存在的样本数量
    """
    print(f"开始测试布隆过滤器性能...")
    print(f"过滤器参数: {bloom_filter}")
    print(f"样本大小: {sample_size}, 测试大小: {test_size}")
    print("-" * 60)

    # 1. 生成测试数据
    print("生成测试数据...")
    existing_items = [generate_random_string(12) for _ in range(sample_size)]
    non_existing_items = [generate_random_string(12) for _ in range(test_size)]
    # 确保两组数据没有交集
    while set(existing_items) & set(non_existing_items):
        non_existing_items = [generate_random_string(12) for _ in range(test_size)]

    # 2. 测试插入性能
    print("测试插入性能...")
    insert_times = []
    start_total_time = time.time()
    
    for item in existing_items:
        start_time = time.perf_counter()
        bloom_filter.add(item)
        end_time = time.perf_counter()
        insert_times.append((end_time - start_time) * 1000)  # 转换为毫秒
    
    total_insert_time = time.time() - start_total_time
    avg_insert_time = np.mean(insert_times)
    print(f"总插入时间: {total_insert_time:.4f} 秒")
    print(f"平均每次插入时间: {avg_insert_time:.6f} 毫秒")
    print(f"总插入吞吐量: {sample_size / total_insert_time:.2f} 操作/秒")

    # 3. 测试查询性能(查询已存在的元素)
    print("\n测试查询性能(存在的元素)...")
    query_existing_times = []
    start_total_time = time.time()
    
    for item in existing_items[:test_size]:  # 只测试前 test_size 个
        start_time = time.perf_counter()
        method = getattr(bloom_filter, 'contains', None)
        if method:
          result = bloom_filter.contains(item)
        else:
          result = item in bloom_filter  # 使用 in 操作符而不是 contains 方法
        end_time = time.perf_counter()
        query_existing_times.append((end_time - start_time) * 1000)  # 转换为毫秒
        assert result, f"存在的元素 {item} 应该被找到!"
    
    total_query_existing_time = time.time() - start_total_time
    avg_query_existing_time = np.mean(query_existing_times)
    print(f"总查询时间(存在元素): {total_query_existing_time:.4f} 秒")
    print(f"平均每次查询时间(存在元素): {avg_query_existing_time:.6f} 毫秒")
    print(f"查询吞吐量(存在元素): {test_size / total_query_existing_time:.2f} 操作/秒")

    # 4. 测试查询性能(查询不存在的元素)
    print("\n测试查询性能(不存在的元素)...")
    query_nonexisting_times = []
    false_positives = 0
    start_total_time = time.time()
    
    for item in non_existing_items:
        start_time = time.perf_counter()
        method = getattr(bloom_filter, 'contains', None)
        if method:
          result = bloom_filter.contains(item)
        else:
          result = item in bloom_filter  # 使用 in 操作符而不是 contains 方法
        end_time = time.perf_counter()
        query_nonexisting_times.append((end_time - start_time) * 1000)  # 转换为毫秒
        
        if result:
            false_positives += 1
    
    total_query_nonexisting_time = time.time() - start_total_time
    avg_query_nonexisting_time = np.mean(query_nonexisting_times)
    false_positive_rate = false_positives / test_size
    
    try:
      print(f"总查询时间(不存在元素): {total_query_nonexisting_time:.4f} 秒")
      print(f"平均每次查询时间(不存在元素): {avg_query_nonexisting_time:.6f} 毫秒")
      print(f"查询吞吐量(不存在元素): {test_size / total_query_nonexisting_time:.2f} 操作/秒")
    except ZeroDivisionError:
      print("Too fast!!! Can not calculate the cost time.")
    print(f"误判数量: {false_positives}/{test_size}")
    print(f"实际误判率: {false_positive_rate:.6f} ({false_positive_rate*100:.4f}%)")

    # 5. 返回测试结果
    return {
        "sample_size": sample_size,
        "test_size": test_size,
        "total_insert_time": total_insert_time,
        "avg_insert_time": avg_insert_time,
        "insert_throughput": sample_size / total_insert_time,
        "total_query_existing_time": total_query_existing_time,
        "avg_query_existing_time": avg_query_existing_time,
        "query_existing_throughput": test_size / total_query_existing_time,
        "total_query_nonexisting_time": total_query_nonexisting_time,
        "avg_query_nonexisting_time": avg_query_nonexisting_time,
        "query_nonexisting_throughput": test_size / total_query_nonexisting_time,
        "false_positives": false_positives,
        "false_positive_rate": false_positive_rate,
        "insert_times": insert_times,
        "query_existing_times": query_existing_times,
        "query_nonexisting_times": query_nonexisting_times
    }

def plot_performance(results, bloom_filter_name):
    """绘制性能测试图表"""
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle(f'Bloom Filter Performance: {bloom_filter_name}', fontsize=16)
    
    # 1. 插入时间分布
    ax1.hist(results['insert_times'], bins=50, alpha=0.7, color='blue', edgecolor='black')
    ax1.set_title('Insert Time Distribution')
    ax1.set_xlabel('Time (milliseconds)')
    ax1.set_ylabel('Frequency')
    ax1.axvline(np.mean(results['insert_times']), color='red', linestyle='dashed', linewidth=1, label=f'Mean: {np.mean(results["insert_times"]):.6f} ms')
    ax1.legend()
    
    # 2. 查询时间分布(存在元素)
    ax2.hist(results['query_existing_times'], bins=50, alpha=0.7, color='green', edgecolor='black')
    ax2.set_title('Query Time Distribution (Existing Items)')
    ax2.set_xlabel('Time (milliseconds)')
    ax2.set_ylabel('Frequency')
    ax2.axvline(np.mean(results['query_existing_times']), color='red', linestyle='dashed', linewidth=1, label=f'Mean: {np.mean(results["query_existing_times"]):.6f} ms')
    ax2.legend()
    
    # 3. 查询时间分布(不存在元素)
    ax3.hist(results['query_nonexisting_times'], bins=50, alpha=0.7, color='orange', edgecolor='black')
    ax3.set_title('Query Time Distribution (Non-existing Items)')
    ax3.set_xlabel('Time (milliseconds)')
    ax3.set_ylabel('Frequency')
    ax3.axvline(np.mean(results['query_nonexisting_times']), color='red', linestyle='dashed', linewidth=1, label=f'Mean: {np.mean(results["query_nonexisting_times"]):.6f} ms')
    ax3.legend()
    
    # 4. 性能指标对比
    metrics = ['Insert', 'Query(Exist)', 'Query(Non-exist)']
    times = [results['avg_insert_time'], results['avg_query_existing_time'], results['avg_query_nonexisting_time']]
    throughputs = [results['insert_throughput'], results['query_existing_throughput'], results['query_nonexisting_throughput']]
    
    ax4_1 = ax4.twinx()
    x = range(len(metrics))
    width = 0.35
    
    bars1 = ax4.bar(x, times, width, label='Avg Time (ms)', color='lightblue', edgecolor='black')
    bars2 = ax4_1.bar([i + width for i in x], throughputs, width, label='Throughput (ops/s)', color='lightcoral', edgecolor='black')
    
    ax4.set_xlabel('Operation Type')
    ax4.set_ylabel('Average Time (milliseconds)', color='lightblue')
    ax4_1.set_ylabel('Throughput (operations/second)', color='lightcoral')
    ax4.set_title('Performance Comparison')
    ax4.set_xticks([i + width/2 for i in x])
    ax4.set_xticklabels(metrics)
    
    # 添加数值标签
    for i, v in enumerate(times):
        ax4.text(i, v + 0.001, f'{v:.4f}', ha='center', va='bottom')
    
    for i, v in enumerate(throughputs):
        ax4_1.text(i + width, v + 5, f'{v:.0f}', ha='center', va='bottom')
    
    ax4.legend(loc='upper left')
    ax4_1.legend(loc='upper right')
    
    plt.tight_layout()
    plt.savefig(f'bloom_filter_performance_{bloom_filter_name}.png', dpi=300, bbox_inches='tight')
    # plt.show()

def print_summary(results, bloom_filter_name):
    """打印测试结果摘要"""
    print("\n" + "="*60)
    print(f"布隆过滤器性能测试摘要 - {bloom_filter_name}")
    print("="*60)
    print(f"样本大小: {results['sample_size']}")
    print(f"测试大小: {results['test_size']}")
    print(f"误判率: {results['false_positive_rate']*100:.6f}% ({results['false_positives']}/{results['test_size']})")
    print(f"平均插入时间: {results['avg_insert_time']:.6f} 毫秒")
    print(f"平均查询时间(存在): {results['avg_query_existing_time']:.6f} 毫秒")
    print(f"平均查询时间(不存在): {results['avg_query_nonexisting_time']:.6f} 毫秒")
    print(f"插入吞吐量: {results['insert_throughput']:.2f} 操作/秒")
    print(f"查询吞吐量(存在): {results['query_existing_throughput']:.2f} 操作/秒")
    print(f"查询吞吐量(不存在): {results['query_nonexisting_throughput']:.2f} 操作/秒")
    print("="*60)

# 使用示例
if __name__ == "__main__":

    # 创建布隆过滤器实例
    capacity = 100000
    error_rate = 0.001
    # 没有过滤器模块的Redis服务配置
    redis_without_bloom_configs = {
      "host": '192.168.48.131', 
      "port": 6380, 
      "db": 0
    }
    # 有过滤器模块的Redis服务配置
    redis_with_bloom_configs = {
      "host": '192.168.48.131', 
      "port": 6379, 
      "db": 0
    }
    local_bloom_cls = [LocalBloomFilter, BloomFilter, ScalableBloomFilter]
    redis_bloom_cls = [RedisLegacyBloomFilter, RedisBloomFilter]
    bloom_filter_instances = {}
    for cls in local_bloom_cls:
      bloom_filter_instances[str(cls)] = cls(capacity, error_rate)
    
    r_client = redis.Redis(
      host=redis_without_bloom_configs['host'], 
      port=redis_without_bloom_configs['port'], 
      db=redis_without_bloom_configs['db']
      )
    bloom_filter_instances[str(RedisLegacyBloomFilter)] = RedisLegacyBloomFilter(r_client, "legacy_bloom_filter", capacity, error_rate)
    
    rb_client = Client(
      host=redis_with_bloom_configs['host'], 
      port=redis_with_bloom_configs['port'], 
      db=redis_with_bloom_configs['db']
      )
    filter_name = "redis_bloom_filter"
    bloom_filter_instances[str(RedisBloomFilter)] = RedisBloomFilter(rb_client, "redis_bloom_filter", capacity, error_rate)
    for k, v in bloom_filter_instances.items():
      # 运行性能测试
      results = test_bloom_filter_performance(v, sample_size=100000, test_size=40000)
      # 打印摘要和图表
      print_summary(results, k)
      plot_performance(results, k.split(".")[-1][0:-2])

需要手动配置本地的Redis服务的地址与端口等数据,具体变量名为:

  1. redis_without_bloom_configs
  2. redis_with_bloom_configs

10.性能测试结果

Manual Redis Bloom Filter params: m=1437758, k=10
开始测试布隆过滤器性能...
过滤器参数: BloomFilter(位数组大小=1437758, 哈希函数数=10, 已添加元素≈0, 当前理论误判率≈0.000000)
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 0.2611 秒
平均每次插入时间: 0.002455 毫秒
总插入吞吐量: 382990.05 操作/秒

测试查询性能(存在的元素)...  
总查询时间(存在元素): 0.1109 秒
平均每次查询时间(存在元素): 0.002614 毫秒
查询吞吐量(存在元素): 360814.31 操作/秒  

测试查询性能(不存在的元素)...
总查询时间(不存在元素): 0.0967 秒
平均每次查询时间(不存在元素): 0.002260 毫秒
查询吞吐量(不存在元素): 413822.08 操作/秒
误判数量: 30/40000
实际误判率: 0.000750 (0.0750%)

============================================================
布隆过滤器性能测试摘要 - <class 'utils.bloom_local_byself.LocalBloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.075000% (30/40000)
平均插入时间: 0.002455 毫秒
平均查询时间(存在): 0.002614 毫秒
平均查询时间(不存在): 0.002260 毫秒
插入吞吐量: 382990.05 操作/秒
查询吞吐量(存在): 360814.31 操作/秒
查询吞吐量(不存在): 413822.08 操作/秒
============================================================
开始测试布隆过滤器性能...
过滤器参数: <pybloom_live.pybloom.BloomFilter object at 0x0000026A90217CA0>
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 0.3619 秒
平均每次插入时间: 0.003461 毫秒
总插入吞吐量: 276344.30 操作/秒

测试查询性能(存在的元素)...
总查询时间(存在元素): 0.1320 秒
平均每次查询时间(存在元素): 0.003122 毫秒
查询吞吐量(存在元素): 303089.50 操作/秒

测试查询性能(不存在的元素)...
总查询时间(不存在元素): 0.0850 秒
平均每次查询时间(不存在元素): 0.001973 毫秒
查询吞吐量(不存在元素): 470420.45 操作/秒
误判数量: 38/40000
实际误判率: 0.000950 (0.0950%)

============================================================
布隆过滤器性能测试摘要 - <class 'pybloom_live.pybloom.BloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.095000% (38/40000)
平均插入时间: 0.003461 毫秒
平均查询时间(存在): 0.003122 毫秒
平均查询时间(不存在): 0.001973 毫秒
插入吞吐量: 276344.30 操作/秒
查询吞吐量(存在): 303089.50 操作/秒
查询吞吐量(不存在): 470420.45 操作/秒
============================================================
开始测试布隆过滤器性能...
过滤器参数: <pybloom_live.pybloom.ScalableBloomFilter object at 0x0000026AB21F5C60>
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 0.5860 秒
平均每次插入时间: 0.005695 毫秒
总插入吞吐量: 170644.31 操作/秒

测试查询性能(存在的元素)...
总查询时间(存在元素): 0.1462 秒
平均每次查询时间(存在元素): 0.003485 毫秒
查询吞吐量(存在元素): 273591.23 操作/秒

测试查询性能(不存在的元素)...
总查询时间(不存在元素): 0.0981 秒
平均每次查询时间(不存在元素): 0.002287 毫秒
查询吞吐量(不存在元素): 407684.00 操作/秒
误判数量: 31/40000
实际误判率: 0.000775 (0.0775%)

============================================================
布隆过滤器性能测试摘要 - <class 'pybloom_live.pybloom.ScalableBloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.077500% (31/40000)
平均插入时间: 0.005695 毫秒
平均查询时间(存在): 0.003485 毫秒
平均查询时间(不存在): 0.002287 毫秒
插入吞吐量: 170644.31 操作/秒
查询吞吐量(存在): 273591.23 操作/秒
查询吞吐量(不存在): 407684.00 操作/秒
============================================================
开始测试布隆过滤器性能...
过滤器参数: <utils.bloom_redis_without_bloom.RedisLegacyBloomFilter object at 0x0000026AB21F5D80>
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 56.5457 秒
平均每次插入时间: 0.564914 毫秒
总插入吞吐量: 1768.48 操作/秒

测试查询性能(存在的元素)...
总查询时间(存在元素): 23.1157 秒
平均每次查询时间(存在元素): 0.577352 毫秒
查询吞吐量(存在元素): 1730.43 操作/秒

测试查询性能(不存在的元素)...
总查询时间(不存在元素): 23.2851 秒
平均每次查询时间(不存在元素): 0.581571 毫秒
查询吞吐量(不存在元素): 1717.84 操作/秒
误判数量: 51/40000
实际误判率: 0.001275 (0.1275%)

============================================================
布隆过滤器性能测试摘要 - <class 'utils.bloom_redis_without_bloom.RedisLegacyBloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.127500% (51/40000)
平均插入时间: 0.564914 毫秒
平均查询时间(存在): 0.577352 毫秒
平均查询时间(不存在): 0.581571 毫秒
插入吞吐量: 1768.48 操作/秒
查询吞吐量(存在): 1730.43 操作/秒
查询吞吐量(不存在): 1717.84 操作/秒
============================================================
开始测试布隆过滤器性能...
过滤器参数: <utils.bloom_redis_with_bloom.RedisBloomFilter object at 0x0000026AB21F5E40>
样本大小: 100000, 测试大小: 40000
------------------------------------------------------------
生成测试数据...
测试插入性能...
总插入时间: 49.4930 秒
平均每次插入时间: 0.494393 毫秒
总插入吞吐量: 2020.49 操作/秒

测试查询性能(存在的元素)...
总查询时间(存在元素): 19.8955 秒
平均每次查询时间(存在元素): 0.496794 毫秒
查询吞吐量(存在元素): 2010.50 操作/秒

测试查询性能(不存在的元素)...
总查询时间(不存在元素): 19.3901 秒
查询吞吐量(不存在元素): 2062.91 操作/秒
误判数量: 12/40000
实际误判率: 0.000300 (0.0300%)

============================================================
布隆过滤器性能测试摘要 - <class 'utils.bloom_redis_with_bloom.RedisBloomFilter'>
============================================================
样本大小: 100000
测试大小: 40000
误判率: 0.030000% (12/40000)
平均插入时间: 0.494393 毫秒
平均查询时间(存在): 0.496794 毫秒
插入吞吐量: 2020.49 操作/秒
查询吞吐量(存在): 2010.50 操作/秒
查询吞吐量(不存在): 2062.91 操作/秒
============================================================

11.代码下载

点击下载:BloomFilters.7z