使用thrift搭建布隆过滤器服务

08 Oct 2012

一.bloom filter(布隆过滤器) 因为可能有海量的爬取任务,所以需要一个排重服务支持,BloomFilter是一个基于hash+概率的比较省空间的算法。大概的意思就是:将一个key 多次hash然后填充到相应的位,判断一个key是否存在就是看这个key的每个hash到的位是否标识。当然hash的次数是有讲究的,与hash空间的大小和待hash的item数量有关系,wiki里面也讲解了如果选取hash个数使得误判率最低。这篇文章写得比较清楚:http://www.cnblogs.com/allensun/archive/2011/02/16/1956532.html

下面是Bloom Filter的java实现的代码: 自己实现了一个bitset(名字是抄C++ STL的),因为jvm的数组只能用int型作为size,所以只能开这么大了。。不知道64位的是不是可以用long

package co.huohua.BloomFilter;

public class BitSet {
    
    private long length;
    private byte[] data;
    private int k = 8;
    
    public BitSet(long size)
    {
        length = size;
        int t = (int)(size/8);
        if (size%8 >0) t+=1;
        data = new byte[t];
    }
    
    public void set(long pos)
    {
        int p = (int)(pos/k);
        int pp = (int)(pos%k);
        data[p] |= 1<<pp;
    }
    public boolean get(long pos)
    {
        int p = (int)(pos/k);
        int pp = (int)(pos%k);
        return (data[p] & (1<<pp)) > 0;
    }

}

下面是BloomFilter的核心,hash函数的生成我是用一大堆质数每次类乘实现的。因为习惯问题,数据结构总喜欢写成模板(java里面叫泛型)。

package co.huohua.BloomFilter;
import java.lang.Math;
import co.huohua.BloomFilter.BitSet;

public class BloomFilter<T extends Object>{
    
    private BitSet bitSet = null;
    private long bitnum;
    private int f_num;
    private int seed[] = new int[]{31,37,41,43,47,53,59,61,67,71,73,79,83,89,97,101,103,109,113,127,131,137,139,149,151,157,163,167,173,179,181,191,193,197,199,211,223,227,229,233,239,241,251,257,263,269,271,277,281,283,293,307,311,313,317,331,337,347,349,353,359,367,373,379,383,389,397,401,409,419,421,431,433,439,443,449,457,461,463,467,479,487,491,499,503,509,521,523,541,547,557,563,569,571,577,587,593,599,601,607,613,617,619,631,641,643,647,653,659,661,673,677,683,691,701,709,719,727,733,739,743,751,757,761,769,773,787,797,809,811,821,823,827,829,839,853,857,859,863,877,881,883,887,907,911,919,929,937,941,947,953,967,971,977,983,991,997};
    public BloomFilter(long bitnum,long expect_num) throws Exception
    {
        //xxxx
        if (bitnum > 1L<<33) throw new Exception("bitnum at most 1L<<33");
        
        //xxxx
        this.bitnum = bitnum;
        int efn =  calc_f_num(this.bitnum,expect_num);
        this.f_num = efn > seed.length ? seed.length : efn;
        this.bitSet = new BitSet(this.bitnum);
        
        System.out.println(efn);
        System.out.println(calc_error(this.bitnum,expect_num));
    }
    
    private int calc_f_num(long bitnum,long expect_num)
    {
        return (int)(Math.log(2)*bitnum/expect_num);
    }
    
    private double calc_error(long bitnum,long expect_num)
    {
        return Math.pow(2, -1*Math.log(2)*bitnum/expect_num);
    }
    
    private long hash(String line,int n) 
    {  
        long h = 0;  
        int len = line.length();  
        for (int i = 0; i < len; i++)
        {  
            h = n * h + line.charAt(i);  
        }  
        return check(h);  
    }  
    private long check(long h) {  
        return Math.abs(h % this.bitnum);  
    }  
    
    public void put(T item)
    {
        for(int i=0;i<f_num;++i)
        {
            String s = item.toString();
            bitSet.set(hash(s,seed[i]));
        }
    }
    
    public boolean test(T item)
    {
        for(int i=0;i<f_num;++i)
        {
            String s = item.toString();
            if (!bitSet.get(hash(s,seed[i]))) return false;
        }
        return true;
    }
    
    @Override
    public String toString()
    {
        
        return this.bitSet.toString();
    }
    
    public static void main(String []args) throws Exception
    {
        
        BloomFilter<String> a = new BloomFilter<String>(1L<<33,500000000);
        
        a.put("a");
        System.out.println(a.test("a"));
        System.out.println(a.test("b"));
        System.out.println(a.test("c"));
        
        a.put("afsdfasdfasdfasdfasdfsdgdfhdfyetjkhkylk46568yh.lmndrstgfkhl,jh");
        System.out.println(a.test("afsdfasdfasdfasdfasdfsdgdfhdfyetjkhkylk46568yh.lmndrstgfkhl,jh"));
        System.out.println(a.test("bfsdfasdfasdfasdf"));
        System.out.println(a.test("cfasdfsdghjgkuhlk"));
    }
}

二.thrift thrift是facebook的一个内部程序通讯框架,用来解决跨语言的问题。thrift的实现就是:1.先规定好通讯的数据格式,和接口格式。2.thrift根据你选择的语言,生成一份这些语言的框架代码,一般情况下,这份代码里面会有2个抽象类(一个是client类,一个是server类),类里面的方法就是第一步中规定的接口。然后你自己implement一下server类,实现一下这些接口方法就行了~然后就可以像调用函数一样从client类里面调用其他程序的服务了。

这是接口的定义:bloomfilter.thrift

service BloomFilterThrift
{
    bool query(1:string s);
    void add(1:string s);
    bool queryAndAdd(1:string s);
    void store();
}

使用thrift生成代码: thrift –gen py java bloomfilter.thrift

会生成gen-java 和gen-py的目录,然后将生成的代码放到你的工程里面就行啦~

下面是java实现的server类

package co.huohua.ThriftServer;
import org.apache.thrift.TException;
import co.huohua.BloomFilter.BloomFilter;
 
public class BloomFilterThriftImpl implements BloomFilterThrift.Iface {
    BloomFilter bf = null;
 
    public BloomFilterThriftImpl(long bit_num,long expect_num) throws Exception
    {
        bf = new BloomFilter(bit_num,expect_num);
    }
 
    @Override
    public boolean query(String s) throws TException {
        // TODO Auto-generated method stub
        return bf.test(s);
    }
 
    @Override
    public void add(String s) throws TException {
        bf.put(s);
    }
 
    @Override
    public boolean queryAndAdd(String s) throws TException {
        boolean r = bf.test(s);
        bf.put(s);
        return r;
    }
 
    @Override
    public void store() throws TException {
        // TODO Auto-generated method stub
    }
}

然后在主程序里面启动server就ok了~可以监听端口,提供服务了:

package co.huohua.ThriftServer;
 
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException; 
 
import co.huohua.ThriftServer.BloomFilterThriftImpl;
 
public class Main {
 
    private void run(long bitnum,long expectnum) throws Exception
    {
        //构造transport类
        TServerSocket transport = new TServerSocket(9090);
        //构造 处理器
        TProcessor processor = new BloomFilterThrift.Processor(new BloomFilterThriftImpl(bitnum,expectnum));
        //构造传输协议工厂类
        Factory proFactory = new TBinaryProtocol.Factory();
        //构造 Server的构造函数需要的参数
        TThreadPoolServer.Args argss = new TThreadPoolServer.Args(transport);
        argss.inputProtocolFactory(proFactory);
        argss.outputProtocolFactory(proFactory);
        argss = argss.processor(processor);
        argss.maxWorkerThreads = 10;
 
        TServer server = new TThreadPoolServer(argss);
        System.out.println("server running...");
        server.serve();
    }
 
    public static void main(String[] args) throws Exception {
 
        if (args.length != 2)
        {
            System.out.println("java -jar xxx.jar bitnum expectnum");
            return;
        }
        long bitnum = Integer.parseInt(args[0]);
        long expectnum = Integer.parseInt(args[1]);
 
        Main main = new Main();
        main.run(bitnum,expectnum);
    }
 
}

可以看到代码里面,一层层的组件了很多,其实每一层都是有很多可选的组件的,比如传输可以选择二进制或者json呀,服务器可以用ThreadPool或者进程池等等。具体要仔细看下doc,而且不同的语言并不相同,因为可能有些语言某些组件并不容易实现吧。

接下来展示用python如何调用这个服务:

import sys
sys.path.append('./gen-py')
 
from bloomfilter import BloomFilterThrift
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
 
try:
    transport = TSocket.TSocket('127.0.0.1', 9090)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = BloomFilterThrift.Client(protocol)
    transport.open()
 
    print client.query('dingyaguang117')
 
    transport.close()
 
except Thrift.TException, ex:
    print "%s" % (ex.message)

其中BloomFilterThrift.Client就是生成的Client类啦!

当时在学thrift的时候看了一下,用的公司除了facebook,还有人人网啊,evernote。。貌似并不是很多~ 可能还有其他优秀的框架吧~~ 希望本文对大家学习Thrift有帮助~

注:后来测试了一下thrift下的BloomFilter,速度非常快,本来以为hash会略慢。误判率和理论值非常接近,当然,数据是慢慢填进去的,在一开始的很长一段时间,都是没有误判的,后来逐渐上升到5%。。。

代码地址