《python数据分析(第2版)-阿曼多.凡丹戈》读书笔记第12章-性能优化、性能分析与并发性及本书附录

时间:2020-04-07 09:31:10   收藏:0   阅读:85

python数据分析个人学习读书笔记-目录索引

第12章性能优化、性能分析与并发性

   过早优化是祸根。“

  ——Donald Knuth,著名的计算机科学家和数学家

  在实际的应用中,性能与功能特性、健壮性、可维护性、可测性以及可用性等是一样重要的。此外,性能与应用程序的可扩展性成正比。因此,性能是本书中一个不可或缺的一个话题。这也是我们把性能这个主题放到最后来讲的原因之一。在这里,我们将性能分析(Profling)作为关键技术,介绍如何改善软件的性能。对于分布式多核系统,我们也会介绍相应的性能优化框架。

  本章将讨论以下主题。

 

12.1代码的性能分析

   所谓性能分析,就是以收集程序运行时的信息为手段,找出代码中哪些部分较慢或者占用内存或处理器资源较多,以便进一步对这些代码做出相应的调整。这里,我们将以第9章中的情绪分析代码为蓝本,稍作修改后,以此为例进行性能分析。对于这个程序,我们将按照多进程编程准则对其进行重构。本章后面的部分将对多进程技术展开进一步讨论。

  此外,我们会对停用词的过滤做进一步简化。最后,我们还会在不降低准确性的前提下,来进一步减少作为特征的单词。对于最后这一点,效果最显著。原始代码的运行时间大约需要20s。而新的代码,其速度将会有明显提升,同时我们将它作为本章的比较基准。某些代码修改与性能分析有关,这将在本节后面讲解。下列代码摘自本书代码包中的prof_demo.py文件。

 1 import random
 2 from nltk import data
 3 data.path.append(rD:\download\nltk_data) # 这里的路径需要换成自己数据文件下载的路径
 4 from nltk.corpus import movie_reviews
 5 from nltk.corpus import stopwords
 6 from nltk import FreqDist
 7 from nltk import NaiveBayesClassifier
 8 from nltk.classify import accuracy
 9 
10 import builtins
11 
12 try:
13     profile = builtins.profile
14 except AttributeError:
15     def profile(func): 
16         return func
17 
18 @profile
19 def label_docs():
20     docs = [(list(movie_reviews.words(fid)), cat)
21             for cat in movie_reviews.categories()
22             for fid in movie_reviews.fileids(cat)]
23     random.seed(42)
24     random.shuffle(docs)
25 
26     return docs
27 
28 @profile
29 def isStopWord(word):
30     return word in sw or len(word) == 1
31 
32 @profile
33 def filter_corpus():
34     review_words = movie_reviews.words()
35     print("# Review Words", len(review_words))
36     res = [w.lower() for w in review_words if not isStopWord(w.lower())]
37     print("# After filter", len(res))
38 
39     return res
40 @profile
41 def select_word_features(corpus):
42     words = FreqDist(corpus)
43     N = int(.02 * len(words.keys()))
44     return list(words.keys())[:N]
45 
46 @profile
47 def doc_features(doc):
48     doc_words = FreqDist(w for w in doc if not isStopWord(w))
49     features = {}
50     for word in word_features:
51         features[count (%s) % word] = (doc_words.get(word, 0))
52     return features
53 
54 @profile
55 def make_features(docs):
56     return [(doc_features(d), c) for (d,c) in docs]
57 
58 @profile
59 def split_data(sets):
60     return sets[200:], sets[:200]
61 
62 if __name__ == "__main__":
63     labeled_docs = label_docs()
64 
65     sw = set(stopwords.words(english))
66     filtered = filter_corpus()
67     word_features = select_word_features(filtered)
68     featuresets = make_features(labeled_docs)
69     train_set, test_set = split_data(featuresets)
70     classifier = NaiveBayesClassifier.train(train_set)
71     print("Accuracy", accuracy(classifier, test_set))
72     print(classifier.show_most_informative_features())

  测量运行时间时,运行的进程越少越好。但是,我们不能保证后台没有进程运行,所以需要通过time命令测量3次运行时间并以最短的时间为准。各种操作系统和Cygwin下面都提供了time命令,使用方法如下。

1 $ time python D:\Java2018\PythonDataAnalysis2\Chapter12\prof_demo.py

  邀月注:windows下得用脚本处理,此处略。本文后面有更好方案替代。

  这样,我们会得到一个real类型的运行时间,这种测量方法采用的是时钟时间;对于user和sys类型的运行时间,则是通过CPU时间测量的程序运行时间。实际上,sys时间就是在内核中耗费的时间。笔者的电脑上测得的运行时间见表12-1,其中最小值用括号标示了出来。

时间类型 第一轮 第二轮 第三轮
Real 11.521 10.808 (10.416)
User 9.758 9.826 (9.444)
Sys 0.965 0.643 (0.620)

 

  下面利用Python内置的分析工具来分析代码,具体如下。

$ python -m cProfile -o D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof D:\Java2018\PythonDataAnalysis2\Chapter12\prof_demo.py

  这里,-o选项用来指定输出文件。此外,利用PyPi的程序包gprof2,可以实现分析工具输出结果的可视化。下面我们安装gprof2dot,方法如下。

1 $ pip3 install gprof2dot

  现在创建一个PNG格式的可视化图形,命令如下。

1 $ gprof2dot -f pstats D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof |dot -Tpng -o D:\Java2018\PythonDataAnalysis2\Chapter12\log\cprof.png

   提示:如果出现错误信息dot: command not found,表明我们尚未安装Graphviz。

  完整的图像很大,图12-1只展示其中一部分。

技术分享图片

 

  查询分析工具输出结果的方法如下。

1  python3 -m pstats D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof

  利用这个命令,我们可以把性能分析数据输入到浏览器中。下面我们删除输出结果中的文件名,然后对运行时间进行排序并输出前10个数据。

 

/* 
Welcome to the profile statistics browser.
D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof%
D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof% strip
D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof% sort time
D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof% stats 10
Mon Apr  6 20:56:25 2020    D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof

         34307447 function calls (33773715 primitive calls) in 15.954 seconds

   Ordered by: internal time
   List reduced from 4179 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   319968    2.242    0.000    2.242    0.000 {method findall of re.Pattern objects}
  2839856    1.118    0.000    1.118    0.000 probability.py:127(__setitem__)
        1    0.944    0.944    1.708    1.708 naivebayes.py:193(train)
   319960    0.915    0.000    1.966    0.000 data.py:1192(readline)
     8006    0.776    0.000    0.776    0.000 {built-in method io.open}
        1    0.720    0.720    3.475    3.475 prof_demo.py:36(<listcomp>)
  6343280    0.642    0.000    7.965    0.000 util.py:271(iterate_from)
     2000    0.616    0.000    1.845    0.001 prof_demo.py:46(doc_features)
  3167640    0.616    0.000    0.718    0.000 prof_demo.py:28(isStopWord)
  3167642    0.475    0.000    4.506    0.000 util.py:408(iterate_from)
 */

  最终结果如上所示。

  表12-2对各个标题进行了简要说明。

标题 说明
Ncalls 调用次数
Tottime 某个函数的总耗时(不包括调用子函数所耗时间)
Percall 等于tottime/ncalls
Cumtime 表示该函数及其所有子函数的调用运行的时间,即函数开始调用到返回所经历的时间。这个数值非常准确,即便递归函数也不例外
Percall(第二个) 即函数运行一次的平均时间,等于cumtime/ncalls

 

  使用以下命令可以退出文件浏览器:

D:\Java2018\PythonDataAnalysis2\Chapter12\log\stat.prof% quit
Goodbye.

  安装和运行该分析工具的命令如下。

1 $ pip3 install line_profiler
2 $ kernprof -l -v D:\Java2018\PythonDataAnalysis2\Chapter12\prof_demo.py

      第一行安装报错,提示vs2017编译,这里官方下载:https://support.microsoft.com/en-gb/help/2977003/the-latest-supported-visual-c-downloads

      或者从这里下载buildTools工具:visualcppbuildtools_full

因为详尽的报告过于冗长,所以这里只给出每个函数的摘要信息。当然,这当中有部分重叠内容。

Function: label_docs at line 9 Total time: 6.19904 s
Function: isStopWord at line 19 Total time: 2.16542 s
File: prof_demo.py Function: filter_corpus at line 23
Function: select_word_features at line 32 Total time: 4.05266 s
Function: doc_features at line 38 Total time: 12.5919 s
Function: make_features at line 46 Total time: 14.566 s
Function: split_data at line 50 Total time: 3.6e-05 s

 

12.2安装Cython

   Cython程序语言实际上充当了Python和C/C++之间的胶水。利用Cython工具,可以用Python代码生成C代码,然后将C代码编译成接近于机器语言的二进制代码。Cytoolz软件包提供了许多实用程序,这些程序都是通过将Python的toolz软件包Cython化得到的。以下命令可以用来安装cython和cytoolz。

1 $ pip3 install cython cytoolz

   编译报错:找不到VC++编译器,可以在官方下载:https://support.microsoft.com/en-gb/help/2977003/the-latest-supported-visual-c-downloads

       进行后续处理前,先来看看经过Cython编译处理后的效果。Python的timeit模块可以帮助我们测量时间。下面通过它来对不同的函数进行测度。我们定义下面的函数,它需要的参数为一段代码、一个函数调用以及该段代码的运行次数,具体如下。

1 def time(code, n):
2     times = min(timeit.Timer(code, setup=setup).repeat(3, n))
3 
4     return round(1000* np.array(times)/n, 3)

  我们预定义了一个设置控制字符串,其中包含了所需的代码。下列代码摘自本书代码包中的timeit.py文件,需要在本地机器上使用cython_module进行编译。

 1 import timeit
 2 import numpy as np
 3 
 4 setup = ‘‘‘
 5 import nltk
 6 import cython_module as cm
 7 import collections
 8 from nltk.corpus import stopwords
 9 from nltk.corpus import movie_reviews
10 from nltk.corpus import names
11 import string
12 import pandas as pd
13 import cytoolz
14 
15 sw = set(stopwords.words(‘english‘))
16 punctuation = set(string.punctuation)
17 all_names = set([name.lower() for name in names.words()])
18 txt = movie_reviews.words(movie_reviews.fileids()[0])
19 
20 def isStopWord(w):
21     return w in sw or w in punctuation
22 
23 def isStopWord2(w):
24     return w in sw or w in punctuation or not w.isalpha()
25 
26 def isStopWord3(w):
27     return w in sw or len(w) == 1 or not w.isalpha() or w in all_names
28 
29 def isStopWord4(w):
30     return w in sw or len(w) == 1
31 
32 def freq_dict(words):
33     dd = collections.defaultdict(int)
34 
35     for word in words:
36         dd[word] += 1
37 
38     return dd
39 
40 def zero_init():
41     features = {}
42 
43     for word in set(txt):
44         features[‘count (%s)‘ % word] = (0)
45 
46 def zero_init2():
47     features = {}
48     for word in set(txt):
49         features[word] = (0)
50 
51 keys = list(set(txt))
52 
53 def zero_init3():
54     features = dict.fromkeys(keys, 0)
55 
56 zero_dict = dict.fromkeys(keys, 0)
57 
58 def dict_copy():
59     features = zero_dict.copy()
60 ‘‘‘
61 
62 def time(code, n):
63     times = min(timeit.Timer(code, setup=setup).repeat(3, n))
64 
65     return round(1000* np.array(times)/n, 3)
66 
67 if __name__ == __main__:
68     print("Best of 3 times per loop in milliseconds")
69     n = 10
70     print("zero_init ", time("zero_init()", n))
71     print("zero_init2", time("zero_init2()", n))
72     print("zero_init3", time("zero_init3()", n))
73     print("dict_copy ", time("dict_copy()", n))
74     print("\n")
75 
76     n = 10**2
77     print("isStopWord ", time([w.lower() for w in txt if not isStopWord(w.lower())], n))
78     print("isStopWord2", time([w.lower() for w in txt if not isStopWord2(w.lower())], n))
79     print("isStopWord3", time([w.lower() for w in txt if not isStopWord3(w.lower())], n))
80     print("isStopWord4", time([w.lower() for w in txt if not isStopWord4(w.lower())], n))
81     print("Cythonized isStopWord", time([w.lower() for w in txt if not cm.isStopWord(w.lower())], n))
82     print("Cythonized filter_sw()", time(cm.filter_sw(txt), n))
83     print("\n")
84 
85     print("FreqDist", time("nltk.FreqDist(txt)", n))
86     print("Default dict", time(freq_dict(txt), n))
87     print("Counter", time(collections.Counter(txt), n))
88     print("Series", time(pd.Series(txt).value_counts(), n))
89     print("Cytoolz", time(cytoolz.frequencies(txt), n))
90     print("Cythonized freq_dict", time(cm.freq_dict(txt), n))

  以上代码中有多个不同版本的isStopword()函数,下面是这些函数的运行时间,这里以毫秒(milliseconds)为单位。

isStopWord 0.843
isStopWord2 0.902
isStopWord3 0.963
isStopWord4 0.869
Cythonized isStopWord 0.924
Cythonized filter_sw() 0.887

  为了进行比较,我们还要对pass语句的运行时间进行计时。这里,Cython编译后的StopWord()函数是基于过滤最严格的isStopWord3()函数。如果考察prof_demo.py中的doc_features()函数,就会发现我们并没有仔细检查每个特征单词。相反,我们只对文档中的单词和被选为特征的单词感兴趣。因此,其他单词计数可以放心置零。事实上,如果我们把全部的值初始化为0,然后复制这个字典,那最好不过了。下面这些函数的相应执行时间如下。

Zero_init 0.61
Zero_init2 0.555
Zero_init3 0.017
Dict_copy 0.011

  另外一种改进性能的方法是,使用Python内置的defaultdict类,而非NLTK提供的FreqDist类。相应程序运行时间如下。

FreqDist 2.206
Default dict 0.674
Counter 0.79
Series 7.006
Cytoolz 0.542
Cythonized freq_dict 0.616

  就像我们看到的那样,Cython编译后的版本始终是要快一些,尽管有时候不是快很多。

 

 12.3调用C代码

   我们可以从Cython调用C函数。C语言的字符串函数strlen()相当于Python语言中的len()函数。当从一个Cython的.pyx文件中调用这个C函数时,需要将其导入,代码如下。

1 from libc.string cimport strlen

  这样,我们就可以在.pyx文件的其他地方来调用这个strlen()函数了。而这个.pyx文件可以随意包含任何的Python代码。下面的代码摘自本书代码包中的cython_module.pyx文件。

 1 from collections import defaultdict
 2 from nltk.corpus import stopwords
 3 from nltk.corpus import names
 4 from libc.string cimport strlen
 5 from nltk import data
 6 data.path.append(rD:\download\nltk_data) # 这里的路径需要换成自己数据文件下载的路径
 7 
 8 sw = set(stopwords.words(english))
 9 all_names = set([name.lower() for name in names.words()])
10 
11 def isStopWord(w):
12      py_byte_string = w.encode(UTF-8)
13      cdef char* c_string = py_byte_string
14      truth = (w in sw) or (w in all_names) or (not w.isalpha()) or (strlen(c_string) == 1)
15      return truth
16 
17 def filter_sw(words):
18     return [w.lower() for w in words if not isStopWord(w.lower())]
19 
20 def freq_dict(words):
21     dd = defaultdict(int)
22 
23     for word in words:
24         dd[word] += 1
25 
26     return dd

  编译这段代码,我们需要一个包含以下内容的setup.py文件。

1 from distutils.core import setup
2 from Cython.Build import cythonize
3 
4 setup(
5     ext_modules = cythonize("cython_module.pyx")
6 )

  编译程序代码的命令如下。

1 $ python3 setup.py build_ext –inplace

  以下输出显示了cython_module扩展的构建信息。

$python3 setup.py build_ext –inplace
Running build_ext
Building ‘cython_module’ extension
Creating build
Creating build/temp.macosx-10.12-x86_64-3.6
Clang -Wno-unused-result -Wsign-compare -Wunreachable-code -fno-common –
Dynamic -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes –
I/usr/local/include -I/usr/local/opt/openssl/include –
I/usr/local/opt/sqlite/include –
I/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/i
Nclude/python3.6m -c cython_module.c -o build/temp.macosx-10.12-
X86_64-3.6/cython_module.o
Clang -bundle -undefined dynamic_lookup build/temp.macosx-10.12-
X86_64-3.6/cython_module.o -L/usr/local/lib -L/usr/local/opt/openssl/lib –
L/usr/local/opt/sqlite/lib -o
/Users/armando/gdrive/projects/bitbucket/pubs/2016-pda-e2-
Packt/chapters/ch-12/cython_module.cpython-36m-darwin.so

  如今,我们可以修改这个情感分析程序,让它来调用Cython函数。此外,我们还可以根据前面曾经介绍的方法来改善这个代码的性能。因为一些函数会重复使用,所以我们将这些函数抽取出来并集中到本书代码包中的core.py文件中。下面的代码摘自本书代码包中的cython_demo.py文件(这些代码需要本地机器能够支持cython_module)。

 1 from nltk.corpus import movie_reviews
 2 from nltk import data
 3 data.path.append(rD:\download\nltk_data) # 这里的路径需要换成自己数据文件下载的路径
 4 
 5 from nltk import NaiveBayesClassifier
 6 from nltk.classify import accuracy
 7 import cython_module as cm
 8 import cytoolz
 9 from core import label_docs
10 from core import filter_corpus
11 from core import split_data
12 
13 
14 def select_word_features(corpus):
15     words = cytoolz.frequencies(filtered)
16     sorted_words = sorted(words, key=words.get)
17     N = int(.02 * len(sorted_words))
18 
19     return sorted_words[-N:]
20 
21 def match(a, b):
22     return set(a.keys()).intersection(b)
23 
24 def doc_features(doc):
25     doc_words = cytoolz.frequencies(cm.filter_sw(doc))
26 
27     # initialize to 0
28     features = zero_features.copy()
29 
30     word_matches = match(doc_words, word_features)
31 
32     for word in word_matches:
33         features[word] = (doc_words[word])
34 
35     return features
36 
37 def make_features(docs):
38     return [(doc_features(d), c) for (d,c) in docs]
39 
40 if __name__ == "__main__":
41     labeled_docs = label_docs()
42 
43     filtered = filter_corpus()
44     word_features = select_word_features(filtered)
45     zero_features = dict.fromkeys(word_features, 0)
46     featuresets = make_features(labeled_docs)
47     train_set, test_set = split_data(featuresets)
48     classifier = NaiveBayesClassifier.train(train_set)
49     print("Accuracy", accuracy(classifier, test_set))
50     print(classifier.show_most_informative_features())

  我们使用time命令执行代码。

1 $ time python3 cython_demo.py

  表12-3是对time命令运行结果的总结。注意,括号中的值是最小值。

时间类型 第一轮 第二轮 第三轮
Real (9.639) 9.817 9.912
User (9.604) 9.661 9.683
Sys (0.404) 0.424 0.451

 

  与以前的代码的执行时间相比,我们可以看到性能有了明显的提升。时间表12-4是从12.2节中转载过来的,以便于进行比较。

时间类型 第一轮 第二轮 第三轮
Real 11.521 10.808 (10.416)
User 9.758 9.826 (9.444)
Sys 0.965 0.643 (0.620)

 

 12.4利用multiprocessing创建进程池

   Multiprocessing是Python的一个标准模块,可以用于多处理器机器。Multiprocessing通过创建多个进程,成功解决了全局解释器锁(the Global Interpreter Lock,GIL)问题。

  提示:GIL会锁定Python的字节码,导致只有一个线程可以访问这些字节码。

  Multiprocessing支持进程池、队列和管道技术。进程池实际上就是可以并行执行一个函数的一组系统进程。队列是一些数据结构,通常用于存储任务。管道用来连接不同的进程,而且连接方式为一个进程的输出作为另一个进程的输入。

  提示:Python的Windows平台版本没有实现os.fork()函数,因此我们务必确保导入该函数并且将def语句块定义在if name == “main”语句块之外。

  下面创建一个进程池并注册一个函数,代码如下。

1 p = mp.Pool(nprocs)

  进程池有一个map()方法,我们可以看成是Python并行的map()函数。

1    p.map(simulate, [i for i in range(10, 50)])

  模拟微粒的一维运动,它实际上进行的是随机游走,我们这里关心的是微粒终点位置的均值。我们重复这个模仿实验,每次具有不同的步长。实际上,计算本身并不重要,重要的是与单个进程相比,多个进程的加速效果如何,我们将通过matplotlib绘制加速比。完整的代码摘自本书代码包中的multiprocessing_sim.py文件。

 1 from numpy.random import random_integers
 2 from numpy.random import randn
 3 import numpy as np
 4 import timeit
 5 import argparse
 6 import multiprocessing as mp
 7 import matplotlib.pyplot as plt
 8 
 9 
10 def simulate(size):
11     n = 0
12     mean = 0
13     M2 = 0
14 
15     speed = randn(10000)
16 
17     for i in range(1000): 
18         n = n + 1
19         indices = random_integers(0, len(speed)-1, size=size)
20         x = (1 + speed[indices]).prod()
21         delta = x - mean
22         mean = mean + delta/n
23         M2 = M2 + delta*(x - mean)
24 
25     return mean
26 
27 def serial():
28     start = timeit.default_timer()
29 
30     for i in range(10, 50):
31         simulate(i)
32     
33     end = timeit.default_timer() - start
34     print("Serial time", end)
35 
36     return end
37 
38 def parallel(nprocs):
39     start = timeit.default_timer()
40     p = mp.Pool(nprocs)
41     print(nprocs, "Pool creation time", timeit.default_timer() - start)
42 
43     p.map(simulate, [i for i in range(10, 50)])
44     p.close()
45     p.join()
46 
47     end = timeit.default_timer() - start
48     print(nprocs, "Parallel time", end)
49     return end
50 
51 if __name__ == "__main__":
52     ratios = []
53     baseline = serial()
54 
55     for i in range(1, mp.cpu_count()):
56         ratios.append(baseline/parallel(i))
57 
58     plt.xlabel(# processes)
59     plt.ylabel(Serial/Parallel)
60     n = np.arange(1, mp.cpu_count())
61     plt.plot(n, ratios)
62     plt.grid(True)
63     plt.show()

  当进程池大小由1变到8时,加速比的变化如图12-3所示。

技术分享图片

 

  阿姆达尔定律(详情请访问http://en.wikipedia.org/wiki/Amdahl%27s_law)代表了处理器平行运算后效率提升的能力。这个定律可以用来预测加速比的最大可能值。进程的数量限制了加速比的绝对最大值。不过,图12-3,使用双进程时速度并没有加倍,使用3个进程时速度也没有比原来快3倍,但是可以接近这个方向。任何给定的Python代码,总会有些部分是无法并行化的。例如,我们可能需要等待资源被释放,或者进行的计算必须串行完成。有时,我们还必须考虑并行化配置和进程间相应通信带来的开销。阿姆达尔定律指出,加速比的倒数、进程数量的倒数和代码中无法并行计算部分所占的比例之间存在线性关系。

 12.5通过Joblib提高for循环的并发性

   Joblib是一个由scikit-learn的开发者创建的Python库,旨在改善长时间运行的Python函数的性能。实际上,Joblib是通过在幕后使用多进程或者线程技术来实现高速缓存和并行化从而达到提升性能的目的。安装Joblib的方法如下。

1 $ pip3 install joblib

  我们会重新使用前面的示例代码,只是变更parallel()函数。相关的代码摘自本书代码包中的joblib_demo.py文件。

1 def parallel(nprocs):
2     start = timeit.default_timer()
3     Parallel(nprocs)(delayed(simulate)(i) for i in range(10, 50))
4 
5     end = timeit.default_timer() - start
6     print(nprocs, "Parallel time", end)
7     return end

  最终结果如图12-4所示(注意,实际的处理器数量取决于我们的硬件)。

 技术分享图片

 

12.6比较Bottleneck函数与NumPy函数

   Bottleneck是受到Numpy和Scipy的启发而创建的一组函数,它们着眼于高性能,都是由Cython写成的。Bottleneck为数组维数、坐标轴和数据类型的每种组合都提供了单独的Cython函数。但是,这并不表示最终用户和Bottleneck的限制因素决定了到底执行哪一个Cython函数。安装Bottleneck的命令如下。

1 $ pip3 install bottleneck

   下面对numpy.median()和SciPy.stats的执行时间进行比较。

  这对人为决定是否使用Cython函数很有帮助,尤其是在将这些Cython函数用于紧密循环(tight loop)或者频繁调用的函数中时。下面打印Bottleneck中median()函数的名称,代码如下。

 1 import bottleneck as bn
 2 import numpy as np
 3 import timeit
 4 
 5 
 6 setup = ‘‘‘
 7 import numpy as np
 8 import bottleneck as bn
 9 from scipy.stats import rankdata
10 
11 np.random.seed(42)
12 a = np.random.randn(30)
13 ‘‘‘
14 def time(code, setup, n):
15     return timeit.Timer(code, setup=setup).repeat(3, n)
16 
17 if __name__ == __main__:
18     n = 10**3
19     print(n, "pass", max(time("pass", "", n)))
20     print(n, "min np.median", min(time(np.median(a), setup, n)))
21     print(n, "min bn.median", min(time(bn.median(a), setup, n)))
22     a = np.arange(7)
23     print("Median diff", np.median(a) - bn.median(a))
24     
25     print(n, "min scipy.stats.rankdata", min(time(rankdata(a), setup, n)))
26     print(n, "min bn.rankdata", min(time(bn.rankdata(a), setup, n)))

  下面是函数名和相应的运行时间。

1000 pass 8.700000000000374e-06
1000 min np.median 0.03629350000000109
1000 min bn.median 0.0003940999999993977
Median diff 0.0
1000 min scipy.stats.rankdata 0.04359799999999936
1000 min bn.rankdata 0.0010945000000006644

  显然,Bottleneck快极了。不过,Bottleneck提供的函数还不多。表12-5给出了已实现的函数。

种类 函数
NumPy/SciPy median、nanmedian、rankdata、ss、nansum、nanmin、nanmax、nanmean、nanstd、nanargmin和nanargmax
函数 nanrankdata、nanvar、partsort、argpartsort、replace、nn、anynan和allnan
滑动窗口 move_sum、move_nansum、move_mean、move_nanmean、move_median、move_std、move_nanstd、move_min、move_nanmin、move_max和move_nanmax

 

12.7通过Jug实现MapReduce

  Jug是一个分布式计算框架,它以任务作为并行化的主要单位。作为后端程序,Jug要用到文件系统或者Redis服务器。Redis服务器已经在第8章中介绍过,下面我们介绍Jug的安装方法,命令如下。

1 $ pip3 install jug

  MapReduce是一种分布式算法,它可以通过计算机集群来处理大规模数据。这个算法通常包含映射和化简两个步骤。映射阶段,数据是以并行方式进行处理的。这时,数据将被划分成一些数据块,而且过滤及其他操作都是针对每个数据块进行的。化简阶段对映射阶段的处理结果进行合并,如创建一个统计报告。

  如果有一个文本文件列表,那么我们可以针对每个文件来计算单词计数,这个工作可以在映射阶段完成。最后,我们可以把各个单词计数组合成一个语料库词频字典。Jug提供了MapReduce功能,我们可以通过本书代码包中的jug_demo.py文件加以演示。需要注意的是,这段代码依赖于cython_module。

 1 import jug.mapreduce
 2 from jug.compound import CompoundTask
 3 import cython_module as cm
 4 import cytoolz
 5 import pickle
 6 
 7 def get_txts():
 8     return [(1, Lorem ipsum dolor sit amet, consectetur adipiscing elit.), 
(2, Donec a elit pharetra, malesuada massa vitae, elementum dolor.),
(3, Integer a tortor ac mi vehicula tempor at a nunc.)] 9 10 def freq_dict(file_words): 11 filtered = cm.filter_sw(file_words[1].split()) 12 13 fd = cytoolz.frequencies(filtered) 14 15 return fd 16 17 def merge(left, right): 18 return cytoolz.merge_with(sum, left, right) 19 20 merged_counts = CompoundTask(jug.mapreduce.mapreduce, merge, freq_dict, get_txts(), map_step=1)

   ModuleNotFoundError: No module named ‘cython_module‘

  上述代码在化简阶段调用了merge()函数,在映射阶段调用了freq_dict()函数。此外,我们还定义了一个包含多个子任务的Jug CompoundTask。运行这段代码前,需要启动一个Redis服务器。然后,就可以利用以下命令进行MapReduce处理了。

1 $ jug execute jug_demo.py –jugdir=redis://127.0.0.1/&

  以上命令末尾的&符号表示这条命令将在后台运行。利用这种方式,可以从多台计算机来执行该命令,只要Redis服务器可以通过访问即可。在这个例子中,Redis仅在本地计算机上运行,本地主机的IP地址是127.0.0.1。但是,我们仍然可以在本机上多次运行该命令。我们可以查看该Jug命令的状态,方法如下。

1 $ jug status jug_demo.py

  默认情况下,如果没有设置jugdir参数,Jug会将数据存放到当前工作目录下。如果要清除Jug相关目录,我们可以使用如下命令。

1 $ jug cleanup jug_demo.py

  如果想查询Redis并进行其他分析工作,还需要用到其他程序。

  在这个程序中,使用下列命令来初始化Jug。

1 jug.init(‘jug_demo.py’, ‘redis://127.0.0.1/’)
2 Import jug_demo

  以下代码用来获得化简阶段生成的结果。

1 words = jug.task.value(jug_demo.merged_counts)

  以下代码取自本书代码包中的jug_redis.py文件。

1 import jug
2 
3 def main():
4     jug.init(jug_demo.py, redis://127.0.0.1/)
5     import jug_demo
6     print("Merged counts", jug.task.value(jug_demo.merged_counts))
7 
8 if __name__ == "__main__":
9     main()

 

12.8安装MPI for Python

   消息传递接口(The Message Passing Interface,MPI)是一种标准协议,由计算机专家开发用来实现分布式计算机的广泛协作。20世纪90年代,MPI主要用于Fortran和C语言编写的程序中,但是,它不依赖具体的硬体和计算机语言。MPI函数可以完成发送和接收操作、实现MapReduce功能和同步。MPI既有处理两个处理器的点对点函数,也提供了处理所有处理器之间的操作的相应函数。MPI支持多种计算机语言,也就是说,提供了多种语言绑定,其中就包括Python。读者可以自行下载MPI,在本书写作时,MPI的最新版本为2.0.2。当然,我们也可以通过网络检查是否有更新的版本。安装MPI的时候时间可能有点长,大概需要30min。下面是具体的安装命令,这里假设将其安装到/usr/local目录下面。

1 $ ./configure –prefix=/usr/local
2 $ make all
3 $ sudo make install

  下面来安装MPI的Python绑定,命令如下。

1 $ pip3 install mpi4py

 

12.9IPython Parallel

   IPython Parallel是用于并行计算的IPython应用程序接口。这里,我们用它来通过MPI来传递消息。为此,我们可能需要设置相应的环境变量,命令如下。

$ export LC_ALL=en_US.UTF-8
$ export LANG=en_US.UTF-8

  在命令行执行以下命令。

1 $ ipython3 profile create –parallel –profile=mpi

  以上命令将在我们的home目录下面创建文件,具体路径为.ipython/ profile_mpi/。

  启动一个使用MPI性能分析的集群,命令如下。

1 $ ipcluster start –profile=mpi –engines=MPI –debug

  上面的命令中,规定性能分析工具为mpi,同时指定MPI引擎提供调试级别的记录功能。这样,我们就可以通过IPython Notebook与集群进行交互了。输入下列命令,我们得到一个具有绘图功能的笔记本(notebook),同时NumPy、SciPy和matplotlib会自动导入。

1 $ jupyter-notebook –profile=mpi –log-level=DEBUG

  以上命令使用mpi作为调试记录级别的性能分析工具。对于这个笔记本例子而言,它存放于本书代码包中的IPythonParallel.ipynb文件中。下面导入IPython Parallel的Client类和statsmodels.api模块,命令如下。

1 from ipyparallel import Client
2 import statsmodels.api as sm

  下面加载太阳黑子数据,然后计算平均值。

1 data_loader = sm.datasets.sunspots.load_pandas()
2 vals = data_loader.data[‘SUNACTIVITY’].values
3 glob_mean = vals.mean()
4 glob_mean

  以下是输出结果。

Out [2]: 49.752103559870541

  创建一个客户端,命令如下。

1 c = Client(profile=’mpi’)

为客户端创建一个视图,命令如下。

1 view=c[:]

  IPython提供了许多魔术(magics)命令,即IPython notebooks的一些专用的命令。如果要启用这些命令,方法如下。

1 view.activate()

  下面来加载摘自本书代码包中的mpi_ipython.py文件。

 1 from mpi4py import MPI
 2 from numpy.random import random_integers
 3 from numpy.random import randn
 4 import numpy as np
 5 import statsmodels.api as sm
 6 import bottleneck as bn
 7 import logging
 8 
 9 
10 def jackknife(a, parallel=True):
11     data_loader = sm.datasets.sunspots.load_pandas()
12     vals = data_loader.data[SUNACTIVITY].values
13 
14     results = []
15 
16     for i in a:
17         tmp = np.array(vals.tolist())
18         tmp[i] = np.nan
19         results.append(bn.nanmean(tmp))
20 
21     results = np.array(results)
22 
23     if parallel:
24         comm = MPI.COMM_WORLD
25         rcvBuf = np.zeros(results.shape)
26         comm.gather([results, MPI.DOUBLE], [rcvBuf, MPI.DOUBLE])
27 
28     return results
29 
30 if __name__ == "__main__":
31     skiplist = np.arange(39, dtype=int)
32     print(jackknife(skiplist, True))
33     

  上面的程序中包含了一个执行刀切法重采样的函数。刀切法重采样技术是一种重采样技术,即先删除样本中的一个观测值,然后再根据需要进行相应的统计估计。就本例而言,我们关心的是平均值。剔除一个观测值的方法是:将其置为NumPy NaN,然后对新样本执行Bottleneck的nanmean()函数。下面先来执行加载任务。

1 view.run(‘mpi_ipython.py‘)

  然后,拆取并扩展一个数组,让它带有太阳黑子数组全部索引。

1 view.scatter(‘a‘,np.arange(len(vals),dtype=‘int‘))

  数组a可以在笔记本中显示出来,命令如下。

1 view[‘a‘]

  以上命令的输出结果如下。

Out[8]:[array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
33, 34, 35, 36, 37, 38]), … TRUNCATED …]

  在所有客户端上调用jackknife()函数。

1 %px means = jackknife(a)

  所有工作进程(worker processes)结束后,就可以查看结果了。

1 view[‘means‘]

  结果是一个列表,长度与我们启动的进程数量相同。每个进程都返回一个NumPy数组,其中存放刀切法重采样技术计算得到的平均值。这个结构不是十分有用,因此将其转换为一个比较宽的列表。

1 all_means = []
2 for v in view[‘means‘]:
3     all_means.extend(v)
4     mean(all_means)

  这时输出结果如下。

Out [11]: 49.752103559870577

  我们还可以计算标准差,不过这个太简单,这里就不多解释了。下面,我们不妨将刀切法求出的平均值用直方图画出来。

1 hist(all_means, bins=sqrt(len(all_means)))

  进行故障排除时,我们可以使用以下命令来显示工作进程的错误信息。

 [(k, c.metadata[k][‘started‘], c.metadata[k][‘pyout‘],
c.metadata[k][‘pyerr‘]) for k in c.metadata.keys()]

 

12.10小结

   本章对摘自第9章中的情感分析脚本进行了性能调优。我们通过性能分析、Cython和多种性能提升措施,最终让它的速度翻倍了。此外,我们还使用multiprocessing、Joblib、Jug和MPI via IPython Parallel充分利用了并行化。

  本章是本书的最后一章,当然,我们学习的步伐不会就此停住,我们需要不停地修改代码,直到能够满足要求为止。如果我们有一个专门的数据分析项目,那再好不过了,即使仅仅是一个用于练手的项目。如果还没有,我们不妨参加竞赛活动。许多竞赛都提供了不错的奖品。

 

第12章完。

 

附录A:重要概念

   本附录简要回顾本书中涉及的技术术语和概念。

 

附录B常用函数

  本附录列出了一些常用函数并根据其所属程序包进行组织,这些程序包涉及matplotlib、NumPy、Pandas、scikit-learn和SciPy。

Matplotlib

下面是常用的matplotlib函数。

NumPy

下面是常用的NumPy函数。

Pandas

下面是常用的pandas函数。

Scikit-learn

下面是常用的scikit-learn函数。

SciPy

 下面是常用的SciPy函数。

Scipy.fftpack

Scipy.signal

Scipy.stats

 附录完

 

python数据分析个人学习读书笔记-目录索引

 

随书源码官方下载:
https://www.ptpress.com.cn/shopping/buy?bookId=bae24ecb-a1a1-41c7-be7c-d913b163c111

需要登录后免费下载。

 

原文:https://www.cnblogs.com/downmoon/p/12635326.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!