注册
登录

您现在的位置是:首页 > 学无止境

MongoDB千万级数据的分析

木木彡82 2013-11-24 11:04:02 3698人围观
转载自:http://my.oschina.net/tianyongke/blog/171172
所有试验都是只针对所有数据进行统计分析,不针对某条数据的分析。
一、导入

清单1:
读取CSV文件,存储到数据库中
#-*- coding:UTF-8 -*-
'''
Creat...

转载自:http://my.oschina.net/tianyongke/blog/171172

所有试验都是只针对所有数据进行统计分析,不针对某条数据的分析。

一、导入

清单1:

读取CSV文件,存储到数据库中

#-*- coding:UTF-8 -*-
'''
Created on 2013-10-20


@author: tyk



'''
from pymongo.connection import Connection
from time import time
import codecs
import csv
import os
rootdir = "2000W/"                                   # 指明被遍历的文件夹
'''
    
'''
def process_data():
    conn = Connection('localhost', 27017) #获取一个连接
    ##conn.drop_database('guestHouse') 
    db = conn.TYK 
    guest = db.guestHouse              


    guest_info = []
    for parent, dirnames, filenames in os.walk(rootdir):    #三个参数:分别返回1.父目录 2.所有文件夹名字(不含路径) 3.所有文件名字
        for filename in filenames:  
            ErrorLine = []
            key_length = 0
            fullname = os.path.join(parent,filename)
            try:
                #with codecs.open(fullname, encoding='utf_8') as file:
                with codecs.open(fullname, encoding='utf_8_sig') as file:#忽略UTF-8文件前面的BOM
                    keys = file.readline().split(',')#先读掉第一行的注释
                    key_length = len(keys)
                    spamreader = csv.reader(file)#以CSV格式读取,返回的不再是str,而是list
                    for line in spamreader:
                        if key_length != len(line):#部分数据不完整,记录下来
                            ErrorLine.append(line)
                        else:
                            each_info = {}
                            for i in range(1, len(keys)):#过滤第一个字段Name,姓名将不再存到数据库中
                                each_info[keys[i]] = line[i]

                            guest_info.append(each_info)
                            if len(guest_info) == 10000:#每10000条进行一次存储操作
                                guest.insert(guest_info)  
                                guest_info = []
                                
            except Exception, e:
                print filename + "\t" + str(e)
                
            #统一处理错误信息
            with open('ERR/' + os.path.splitext(filename)[0] + '-ERR.csv', 'w') as log_file:
                spamwriter = csv.writer(log_file)
                for line in ErrorLine:
                    spamwriter.writerow(line)
    #最后一批
    guest.insert(guest_info)  
    
if __name__ == '__main__':
    start = time()
    process_data()
    stop = time()
    print(str(stop-start) + "秒")

总结:

1.文件编码为UTF-8,不能直接open()打开读取。

2.文件已CSV格式进行存储,读取时用CSV模块处理来读取。这是读出来的数据每行为一个list。注意,不能简单的以","拆分后进行读取。对于这种形状"a,b,c", d的数据是无法正确解析的。

3.对于UTF-8文件,如果有BOM的形式去读是要以'utf_8_sig'编码读取,这样会跳过开头的BOM。如果不处理掉BOM,BOM会随数据一同存到数据库中,造成类似" XXX"的现象(有一个空格的假象)。

参考:http://docs.python.org/2/library/codecs.html#module-encodings.utf_8_sig

http://www.cnblogs.com/DDark/archive/2011/11/28/2266085.html

如果真的已经存到库中了,那只有改key了

db.guestHouse.update({}, {"$rename" : {" Name" : "Name"}}, false, true)
另外,网上还有一种方法(尝试失败了,具体原因应该是把字符串转换成字节码然后再去比较。怎么转这个我还不会...)
#with codecs.open(fullname, encoding='utf-8') as file:
with codecs.open(fullname, encoding='utf_8_sig') as file:
	keys = file.readline().split(',')
	if keys[0][:3] == codecs.BOM_UTF8:#将keys[0]转化为字节码再去比较
		keys[0] = keys[0][3:]

扩展:

今天发现MongoDB本身就带有导入功能mongoimport,可以直接导入CSV文件...

小试一把

1.不做错误数据过滤,直接导入。用专利引用数据做一下实验(《Hadoop权威指南》一书中的实验数据)

实验数据:

"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
3070803,1963,1096,,"US",
"IL",,1,,2,6,63,,9,,0.3704,,,,,,,
3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,
3070806,1963,1096,,"US","PA",,1,,2,6,63,,0,,,,,,,,,
3070807,1963,1096,,"US","OH",,1,,623,3,39,,3,,0.4444,,,,,,,
3070808,1963,1096,,"US","IA",,1,,623,3,39,,4,,0.375,,,,,,,
3070809,1963,1096,,,,1,,4,6,65,,0,,,,,,,,,
一共11行。第一行注释,9条数据。第3条中间截断,第9条取出中间两个数值"US","AZ"。按照csv规定现在应该是10条数据

结果:

> db.guest.find({}, {"PATENT" : 1, "_id" : 1})
{ "_id" : ObjectId("52692c2a0b082a1bbb727d86"), "PATENT" : 3070801 }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d87"), "PATENT" : 3070802 }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d88"), "PATENT" : 3070803 }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d89"), "PATENT" : "IL" }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8a"), "PATENT" : 3070804 }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8b"), "PATENT" : 3070805 }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8c"), "PATENT" : 3070806 }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8d"), "PATENT" : 3070807 }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8e"), "PATENT" : 3070808 }
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8f"), "PATENT" : 3070809 }
> db.guest.count()
10
>
刚好10条,可见此命令导入是不会过滤异常数据。

2.以UTF-8有BOM格式再试一次。实验数据同上

> db.guest.find({}, {"PATENT" : 1, "_id" : 1})
{ "_id" : ObjectId("52692d730b082a1bbb727d90"), "PATENT" : 3070801 }
{ "_id" : ObjectId("52692d730b082a1bbb727d91"), "PATENT" : 3070802 }
{ "_id" : ObjectId("52692d730b082a1bbb727d92"), "PATENT" : 3070803 }
{ "_id" : ObjectId("52692d730b082a1bbb727d93"), "PATENT" : "IL" }
{ "_id" : ObjectId("52692d730b082a1bbb727d94"), "PATENT" : 3070804 }
{ "_id" : ObjectId("52692d730b082a1bbb727d95"), "PATENT" : 3070805 }
{ "_id" : ObjectId("52692d730b082a1bbb727d96"), "PATENT" : 3070806 }
{ "_id" : ObjectId("52692d730b082a1bbb727d97"), "PATENT" : 3070807 }
{ "_id" : ObjectId("52692d730b082a1bbb727d98"), "PATENT" : 3070808 }
{ "_id" : ObjectId("52692d730b082a1bbb727d99"), "PATENT" : 3070809 }
> db.guest.count()
10
结果同上面一样,key"PATENT "中并没有因BOM引起的空格

3.mongoimport命令解释

mongoimport -d TYK -c guest --type csv --file d:\text.csv --headerline
-d 数据库
-c 集合
--type 数据格式
--file 文件路径
--headerline 貌似指定这个后以第一行为key,另 -f 可以指定key “-f Name, age”

二、统计分析

1.根据性别统计

由于数据不规范,先查询一下有多少种方式来表示性别的

db.runCommand({"distinct" : "guestHouse", "key" : "Gender"})
{
        "values" : [
                "M",
                "F",
                "0",
                " ",
                "1",
                "",
                "19790522",
                "#0449",
                "#M",
                "",
                "N"
        ],
        "stats" : {
                "n" : 20048891,
                "nscanned" : 20048891,
                "nscannedObjects" : 20048891,
                "timems" : 377764,
                "cursor" : "BasicCursor"
        },
        "ok" : 1
}

一共有11中方式表示性别的...那就以M、F做下统计吧
#总数据
db.guestHouse.count()
20048891
#男 M
db.guestHouse.count({"Gender":"M"})
12773070
64%
#女 F
db.guestHouse.count({"Gender":"F"})
6478745
32%
饼状图

总结:

1.带条件count时速度是非常慢的,猜测在count时可能先进行的查询操作,如果是查询加索引效果会好很多。对Gender加索引,效果明显提高了,但仍然是N秒级别的。显然在实时情况下还是不行的。另外随意加索引也会遇其它方面的问题。在用索引时能达到一个平衡点很重要的啊。

2013-10-24
查看count的js解释
> db.guestHouse.count
function ( x ){
    return this.find( x ).count();
}
>
果然是先find,后count

2、根据身份证分析性别

从上面数据看,大约有4%的数据性别不详。

15位身份证号码:第7、8位为出生年份(两位数),第9、10位为出生月份,第11、12位代表出生日期,第15位代表性别,奇数为男,偶数为女。 18位身份证号码:第7、8、9、10位为出生年份(四位数),第11、第12位为出生月份,第13、14位代表出生日期,第17位代表性别,奇数为男,偶数为女。
要根据身份证来分析的话,明显不好直接处理分析了。那么就尝试一下编写MapReduce算一下吧,但是单机MapReduce速度会更慢。

先了解一下数据,看看有多少证件类型

> db.runCommand({"distinct" : "guestHouse", "key" : "CtfTp"})
{
        "values" : [
                "OTH",
                "GID",
                "ID",
                "TBZ",
                "VSA",
                "TXZ",
                "JID",
                "JZ",
                "HXZ",
                "JLZ",
                "#ID",
                "hvz",
                "待定",
                "11",
                "",
                "SBZ",
                "JGZ",
                "HKB",
                "TSZ",
                "JZ1",
                " ",
                "Id",
                "#GID",
                "1"
        ],
        "stats" : {
                "n" : 20048891,
                "nscanned" : 20048891,
                "nscannedObjects" : 20048891,
                "timems" : 610036,
                "cursor" : "BasicCursor"
        },
        "ok" : 1
}
>

数据依旧的乱,那就暂且以"ID"来统计一下吧
>map = function() {
	if (this.CtfTp == "ID") {
		if (this.CtfId.length == 18){
			emit(parseInt(this.CtfId.charAt(16)) % 2, {count : 1}) //1为男,0为女
		}else if (this.CtfId.length == 15) { 
			emit(parseInt(this.CtfId.charAt(14)) % 2, {count : 1}) //无法解析时为NaN
		}
	} else {
		emit(-1, {count : 1})
	}
}

>reduce = function(key, emits) {
	total = 0;
	for (var i in emits) {
		total += emits[i].count;
	}
	return {"count" : total};
}

>mr = db.runCommand(
               {
                 mapReduce: "guestHouse",
                 map: map,
                 reduce: reduce,
                 out: "TYK.guestHouse.output",
                 verbose: true
               }
             )
>{
        "result" : "guestHouse.output",
        "timeMillis" : 999097,
        "timing" : {
                "mapTime" : 777955,
                "emitLoop" : 995248,
                "reduceTime" : 111217,
                "mode" : "mixed",
                "total" : 999097
        },
        "counts" : {
                "input" : 20048891,
                "emit" : 19928098,
                "reduce" : 594610,
                "output" : 4
        },
        "ok" : 1
}

结果:
> db.guestHouse.output.find()
{ "_id" : NaN, "value" : { "count" : 1360 } }
{ "_id" : -1, "value" : { "count" : 1161164 } }
{ "_id" : 0, "value" : { "count" : 6831007 } }
{ "_id" : 1, "value" : { "count" : 11934567 } }
>
总结:

1.速度比直接count({"Gender" : "M"}),并且资源占用不明显。IO压力不大,CPU压力不大。

2.结果中数据加起来为“19928098”条,比总条数“20048891”少了“120793”条,少在哪了?

3、统计各省、地区的情况

清单1:

map = function() {
	//var idCard_reg = /(^\d{15}$)|(^\d{18}$)|(^\d{17}(\d|X|x)$)/; 
	//var idCard_reg = /(^[1-6]\d{14}$)|(^[1-6]\d{17}$)|(^[1-6]\d{16}(\d|X|x)$)/; 
	//((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))
	var idCard_reg = /(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{13}$)|(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{16}$)|(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{15}(\d|X|x)$)/;
	if (this.CtfTp == "ID" && idCard_reg.test(this.CtfId)) {
		emit(this.CtfId.substr(0, 2), {count : 1}) //截取前两位 地区,省、直辖市、自治区
	} else {
		emit(-1, {count : 1})
	}
}

reduce = function(key, emits) {
	total = 0;
	for (var i in emits) {
		total += emits[i].count;
	}
	return {"count" : total};
}

mr  =   db.runCommand(
               {
                 mapReduce: "guestHouse",
                 map: map,
                 reduce: reduce,
                 out: "guestHouse.provinceOutput",
                 verbose: true
               }
             )

{
        "result" : "guestHouse.provinceOutput",
        "timeMillis" : 1173216,
        "timing" : {
                "mapTime" : 900703,
                "emitLoop" : 1169954,
                "reduceTime" : 157916,
                "mode" : "mixed",
                "total" : 1173216
        },
        "counts" : {
                "input" : 20048891,
                "emit" : 20048891,
                "reduce" : 1613854,
                "output" : 31
        },
        "ok" : 1
}

身份证号码参考: http://baike.baidu.com/view/188003.htm#1_2

结果信息:

> db.guestHouse.provinceOutput.find().sort({"value.count" : -1})
{ "_id" : "32", "value" : { "count" : 2398111 } } //江苏
{ "_id" : -1, "value" : { "count" : 1670289 } } //不详
{ "_id" : "37", "value" : { "count" : 1523357 } } //山东
{ "_id" : "33", "value" : { "count" : 1341274 } } //浙江
{ "_id" : "41", "value" : { "count" : 1120455 } } //河南
{ "_id" : "34", "value" : { "count" : 981943 } } //安徽
{ "_id" : "42", "value" : { "count" : 974855 } } //湖北
{ "_id" : "31", "value" : { "count" : 921046 } } //上海
{ "_id" : "13", "value" : { "count" : 791432 } } //河北
{ "_id" : "21", "value" : { "count" : 754645 } } //辽宁
{ "_id" : "14", "value" : { "count" : 689738 } } //山西
{ "_id" : "51", "value" : { "count" : 664918 } } //四川(包含重庆)
{ "_id" : "36", "value" : { "count" : 594849 } } //江西
{ "_id" : "23", "value" : { "count" : 581882 } } //黑龙江
{ "_id" : "61", "value" : { "count" : 571792 } } //陕西
{ "_id" : "35", "value" : { "count" : 571107 } } //福建
{ "_id" : "43", "value" : { "count" : 562536 } } //湖南
{ "_id" : "44", "value" : { "count" : 558249 } } //广东
{ "_id" : "11", "value" : { "count" : 495897 } } //北京
{ "_id" : "22", "value" : { "count" : 456159 } } //吉林
{ "_id" : "15", "value" : { "count" : 392787 } } //内蒙
{ "_id" : "12", "value" : { "count" : 320711 } } //天津
{ "_id" : "62", "value" : { "count" : 227366 } } //甘肃
{ "_id" : "45", "value" : { "count" : 192810 } } //广西
{ "_id" : "52", "value" : { "count" : 187622 } } //贵州
{ "_id" : "65", "value" : { "count" : 145040 } } //新疆
{ "_id" : "53", "value" : { "count" : 141652 } } //云南
{ "_id" : "63", "value" : { "count" : 75509 } } //青海
{ "_id" : "64", "value" : { "count" : 75105 } } //宁夏
{ "_id" : "46", "value" : { "count" : 48279 } } //海南
{ "_id" : "54", "value" : { "count" : 17476 } } //西藏


对结果在此进行分析,根据地区处理。

db.guestHouse.provinceOutput.group({
		keyf:function(doc){return {"key" : doc._id.substr(0,1)}},//<span>以省标识的第一位再次分组统计 </span>                initial : {total : 0},
		reduce :function(curr, result){
				result.total += curr.value.count;
			},
		cond : {"_id" : {"$ne" : -1}},
		finalize: function(result) {
                               var areas= [ "华北", "东北", "华东",
                                                "中南", "西南",
                                                "西北" ];
                               result.area = areas[result.key - 1];
                   }
	})


[
        {
                "key" : "1",
                "total" : 2690565,
                "area" : "华北"
        },
        {
                "key" : "2",
                "total" : 1792686,
                "area" : "东北"
        },
        {
                "key" : "3",
                "total" : 8331687,
                "area" : "华东"
        },
        {
                "key" : "4",
                "total" : 3457184,
                "area" : "中南"
        },
        {
                "key" : "5",
                "total" : 1011668,
                "area" : "西南"
        },
        {
                "key" : "6",
                "total" : 1094812,
                "area" : "西北"
        }
]

group 函数参考:

http://docs.mongodb.org/manual/reference/method/db.collection.group/

http://docs.mongodb.org/manual/reference/command/group/#dbcmd.group


疑问与总结:

a.前面说的MapReduce没有count占用资源是错误的,今天发现任务管理器不会实时更新了⊙﹏⊙b汗

b.group的'keyf'这个配置有时间很有用处(key、 keyf只能二选一)

c.在map时加上query : {"CtfTp" : "ID"}一定会提高速度吗? 

2013-10-29 22:57:00
详见测试:http://my.oschina.net/tianyongke/blog/172794

d.部分日志

           22:10:47.001 [conn62] 		M/R: (1/3) Emit Progress: 19874400/20048891	99%
           22:10:50.000 [conn62] 		M/R: (1/3) Emit Progress: 19923500/20048891	99%
           22:10:53.005 [conn62] 		M/R: (1/3) Emit Progress: 19974700/20048891	99%
           22:10:56.603 [conn62] 		M/R: (1/3) Emit Progress: 20016000/20048891	99%
           22:10:59.001 [conn62] 		M/R: (1/3) Emit Progress: 20047200/20048891	99%
           22:11:02.052 [conn62] 		M/R: (3/3) Final Reduce Progress: 84500/112318	75%
           22:11:02.393 [conn62] CMD: drop TYK.guestHouse.provinceOutput
           22:11:02.531 [conn62] command admin.$cmd command: { renameCollection: "TYK.tmp.mr.guestHouse_9", to: "TYK.guestHouse.provinceOutput", stayTemp: false } ntoreturn:1 keyUpdates:0  reslen:37 136ms
           22:11:02.561 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9
           22:11:02.587 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9
           22:11:02.587 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9_inc
           22:11:02.674 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9
           22:11:02.690 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9_inc
           22:11:02.894 [conn62] command TYK.$cmd command: { mapReduce: "guestHouse", map: function () {
//var idCard_reg = /(^\d{15}$)|(^\d{18}$)|(^\d{17}(\d|X|..., reduce: function (key, emits) {
total = 0;
for (var i in emits) {
total += emi..., out: "guestHouse.provinceOutput", verbose: true } ntoreturn:1 keyUpdates:0 numYields: 471197 locks(micros) W:233131 r:1922774271 w:20768395 reslen:232 1173522ms
           22:56:54.820 [conn62] command TYK.$cmd command: { group: { ns: "TYK.guestHouse.provinceOutput", $keyf: function (doc){return {temp_key : doc._id.subStr(0,1)}}, initial: { total: 0.0 }, $reduce: function (doc, prev){
prev.total += doc.value.count;
} } } ntoreturn:1 keyUpdates:0 locks(micros) r:542599 reslen:75 542ms
从日志可以看到的是

    1)map

    2)reduce

    3)drop 指定定集合(以前反复执行时先手动做了这一步,现在看来不用了)

    4)变更临时集合为指定集合

    5)删除临时集合(反复删除,还有一个'_inc'的集合。不知道为什么)

    6)最后处理,执行finalize指定函数

表象的是先map后reduce,实际情况是怎么样的呢?是不是并行?并且reduce执行有1613854次之多,而以日志显示时间推算也就1秒左右。这个1613854是怎么来的,与什么有关?


文章评论

  • 登录后评论

点击排行