使用thrift搭建布隆过滤器服务
一.bloom filter(布隆过滤器) 因为可能有海量的爬取任务,所以需要一个排重服务支持,BloomFilter是一个基于hash+概率的比较省空间的算法。大概的意思就是:将一个key 多次hash然后填充到相应的位,判断一个key是否存在就是看这个key的每个hash到的位是否标识。当然hash的次数是有讲究的,与hash空间的大小和待hash的item数量有关系,wiki里面也讲解了如果选取hash个数使得误判率最低。这篇文章写得比较清楚:http://www.cnblogs.com/allensun/archive/2011/02/16/1956532.html
下面是Bloom Filter的java实现的代码: 自己实现了一个bitset(名字是抄C++ STL的),因为jvm的数组只能用int型作为size,所以只能开这么大了。。不知道64位的是不是可以用long
package co.huohua.BloomFilter;
public class BitSet {
private long length;
private byte[] data;
private int k = 8;
public BitSet(long size)
{
length = size;
int t = (int)(size/8);
if (size%8 >0) t+=1;
data = new byte[t];
}
public void set(long pos)
{
int p = (int)(pos/k);
int pp = (int)(pos%k);
data[p] |= 1<<pp;
}
public boolean get(long pos)
{
int p = (int)(pos/k);
int pp = (int)(pos%k);
return (data[p] & (1<<pp)) > 0;
}
}
下面是BloomFilter的核心,hash函数的生成我是用一大堆质数每次类乘实现的。因为习惯问题,数据结构总喜欢写成模板(java里面叫泛型)。
package co.huohua.BloomFilter;
import java.lang.Math;
import co.huohua.BloomFilter.BitSet;
public class BloomFilter<T extends Object>{
private BitSet bitSet = null;
private long bitnum;
private int f_num;
private int seed[] = new int[]{31,37,41,43,47,53,59,61,67,71,73,79,83,89,97,101,103,109,113,127,131,137,139,149,151,157,163,167,173,179,181,191,193,197,199,211,223,227,229,233,239,241,251,257,263,269,271,277,281,283,293,307,311,313,317,331,337,347,349,353,359,367,373,379,383,389,397,401,409,419,421,431,433,439,443,449,457,461,463,467,479,487,491,499,503,509,521,523,541,547,557,563,569,571,577,587,593,599,601,607,613,617,619,631,641,643,647,653,659,661,673,677,683,691,701,709,719,727,733,739,743,751,757,761,769,773,787,797,809,811,821,823,827,829,839,853,857,859,863,877,881,883,887,907,911,919,929,937,941,947,953,967,971,977,983,991,997};
public BloomFilter(long bitnum,long expect_num) throws Exception
{
//xxxx
if (bitnum > 1L<<33) throw new Exception("bitnum at most 1L<<33");
//xxxx
this.bitnum = bitnum;
int efn = calc_f_num(this.bitnum,expect_num);
this.f_num = efn > seed.length ? seed.length : efn;
this.bitSet = new BitSet(this.bitnum);
System.out.println(efn);
System.out.println(calc_error(this.bitnum,expect_num));
}
private int calc_f_num(long bitnum,long expect_num)
{
return (int)(Math.log(2)*bitnum/expect_num);
}
private double calc_error(long bitnum,long expect_num)
{
return Math.pow(2, -1*Math.log(2)*bitnum/expect_num);
}
private long hash(String line,int n)
{
long h = 0;
int len = line.length();
for (int i = 0; i < len; i++)
{
h = n * h + line.charAt(i);
}
return check(h);
}
private long check(long h) {
return Math.abs(h % this.bitnum);
}
public void put(T item)
{
for(int i=0;i<f_num;++i)
{
String s = item.toString();
bitSet.set(hash(s,seed[i]));
}
}
public boolean test(T item)
{
for(int i=0;i<f_num;++i)
{
String s = item.toString();
if (!bitSet.get(hash(s,seed[i]))) return false;
}
return true;
}
@Override
public String toString()
{
return this.bitSet.toString();
}
public static void main(String []args) throws Exception
{
BloomFilter<String> a = new BloomFilter<String>(1L<<33,500000000);
a.put("a");
System.out.println(a.test("a"));
System.out.println(a.test("b"));
System.out.println(a.test("c"));
a.put("afsdfasdfasdfasdfasdfsdgdfhdfyetjkhkylk46568yh.lmndrstgfkhl,jh");
System.out.println(a.test("afsdfasdfasdfasdfasdfsdgdfhdfyetjkhkylk46568yh.lmndrstgfkhl,jh"));
System.out.println(a.test("bfsdfasdfasdfasdf"));
System.out.println(a.test("cfasdfsdghjgkuhlk"));
}
}
二.thrift thrift是facebook的一个内部程序通讯框架,用来解决跨语言的问题。thrift的实现就是:1.先规定好通讯的数据格式,和接口格式。2.thrift根据你选择的语言,生成一份这些语言的框架代码,一般情况下,这份代码里面会有2个抽象类(一个是client类,一个是server类),类里面的方法就是第一步中规定的接口。然后你自己implement一下server类,实现一下这些接口方法就行了~然后就可以像调用函数一样从client类里面调用其他程序的服务了。
这是接口的定义:bloomfilter.thrift
service BloomFilterThrift
{
bool query(1:string s);
void add(1:string s);
bool queryAndAdd(1:string s);
void store();
}
使用thrift生成代码: thrift –gen py java bloomfilter.thrift
会生成gen-java 和gen-py的目录,然后将生成的代码放到你的工程里面就行啦~
下面是java实现的server类
package co.huohua.ThriftServer;
import org.apache.thrift.TException;
import co.huohua.BloomFilter.BloomFilter;
public class BloomFilterThriftImpl implements BloomFilterThrift.Iface {
BloomFilter bf = null;
public BloomFilterThriftImpl(long bit_num,long expect_num) throws Exception
{
bf = new BloomFilter(bit_num,expect_num);
}
@Override
public boolean query(String s) throws TException {
// TODO Auto-generated method stub
return bf.test(s);
}
@Override
public void add(String s) throws TException {
bf.put(s);
}
@Override
public boolean queryAndAdd(String s) throws TException {
boolean r = bf.test(s);
bf.put(s);
return r;
}
@Override
public void store() throws TException {
// TODO Auto-generated method stub
}
}
然后在主程序里面启动server就ok了~可以监听端口,提供服务了:
package co.huohua.ThriftServer;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import co.huohua.ThriftServer.BloomFilterThriftImpl;
public class Main {
private void run(long bitnum,long expectnum) throws Exception
{
//构造transport类
TServerSocket transport = new TServerSocket(9090);
//构造 处理器
TProcessor processor = new BloomFilterThrift.Processor(new BloomFilterThriftImpl(bitnum,expectnum));
//构造传输协议工厂类
Factory proFactory = new TBinaryProtocol.Factory();
//构造 Server的构造函数需要的参数
TThreadPoolServer.Args argss = new TThreadPoolServer.Args(transport);
argss.inputProtocolFactory(proFactory);
argss.outputProtocolFactory(proFactory);
argss = argss.processor(processor);
argss.maxWorkerThreads = 10;
TServer server = new TThreadPoolServer(argss);
System.out.println("server running...");
server.serve();
}
public static void main(String[] args) throws Exception {
if (args.length != 2)
{
System.out.println("java -jar xxx.jar bitnum expectnum");
return;
}
long bitnum = Integer.parseInt(args[0]);
long expectnum = Integer.parseInt(args[1]);
Main main = new Main();
main.run(bitnum,expectnum);
}
}
可以看到代码里面,一层层的组件了很多,其实每一层都是有很多可选的组件的,比如传输可以选择二进制或者json呀,服务器可以用ThreadPool或者进程池等等。具体要仔细看下doc,而且不同的语言并不相同,因为可能有些语言某些组件并不容易实现吧。
接下来展示用python如何调用这个服务:
import sys
sys.path.append('./gen-py')
from bloomfilter import BloomFilterThrift
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
try:
transport = TSocket.TSocket('127.0.0.1', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = BloomFilterThrift.Client(protocol)
transport.open()
print client.query('dingyaguang117')
transport.close()
except Thrift.TException, ex:
print "%s" % (ex.message)
其中BloomFilterThrift.Client就是生成的Client类啦!
当时在学thrift的时候看了一下,用的公司除了facebook,还有人人网啊,evernote。。貌似并不是很多~ 可能还有其他优秀的框架吧~~ 希望本文对大家学习Thrift有帮助~
注:后来测试了一下thrift下的BloomFilter,速度非常快,本来以为hash会略慢。误判率和理论值非常接近,当然,数据是慢慢填进去的,在一开始的很长一段时间,都是没有误判的,后来逐渐上升到5%。。。
Back