这些天除了忙交大的复试,还一直忙于用 Python 编写抓图程序。好在前几天收到了短信,心总算放了下来,毕竟这一年在思过崖的面壁得到了回报。然而,在西安找实习工作时却处处碰壁,我想说考官大姐们你们可不能以貌取人啊,凭什么他比我帅你们就招他了……算了,不说了,男儿有泪不轻弹,只是未到桑心处。
程序的功能基本已经实现,可是原先仅仅考虑到抓一个网站的图片,当换一个网站,却又得重新编写 HTML 解析代码,好不麻烦。所以,便想着利用设计模式重构代码,使其可应用与大多数图片网站,甚至应用于视频网站。因为程序仍处于开发期,所以在此我并不能透漏具体要抓取的页面地址,实在抱歉。下面我们来看看该程序具体如何实现:
一、ImageLister 类
首先,我设计了一个 ImageLister 类,主要负责解析 HTML 页面(依赖于 BeautifulSoup3),返回图片 URL,当页面有分页时,自动检测分页,顺序分析所有分页。类设计如图 1。
图 1
ImageLister 类的实现代码如下:
# imagelister.py# -*- coding: utf-8 -*-from BeautifulSoup import BeautifulSoupfrom urlparse import urljoinfrom re import subfrom ulib import uopen, ucloseclass ImageLister(object): def __init__(self, first_page): self.first_page = first_page self.title = "" self.info = "" self.pages = [] self.images = [] def getFirstPage(self): return self.first_page def getPages(self): return self.pages def getImages(self): self.images = self.anlzAllImageUrls() return self.images def getTitle(self): return self.title def getInfo(self): return self.info def getHtmlSrc(self, url): u = uopen(url) src = u.read() uclose(u) return src # 分析页面标题 def anlzTitle(self, data): soup = BeautifulSoup(data, fromEncoding="gb18030") title = soup.html.head.title.string.strip() return title def anlzAllPageUrls(self): pass def anlzAllImageUrls(self): passImageListerA 类的实现代码如下:
# imagelistera.py# -*- coding: utf-8 -*-from imagelister import *class ImageListerA(ImageLister): def __init__(self, first_page): super(ImageListerA, self).__init__(first_page) data = self.getHtmlSrc(first_page) self.title = self.anlzTitle(data) self.info = self.anlzInfo(data) self.pages = self.anlzAllPageUrls(data, first_page) # 分析页面简介 # 该函数实现部分不必深究,具有页面特异性 def anlzInfo(self, data): soup = BeautifulSoup(data, fromEncoding="gb18030") comment = soup.find("div", {"class": "comment2"}) info = comment.find("span", {"class": "i_user"}).string info += "\n" contents = comment.find("font", {"color": "#999999"}).contents for content in contents: temp_con = content.strip() info += sub(r"
", "\n", temp_con) return info # 分析得到所有分页页面链接 # 该函数实现部分不必深究,具有页面特异性 def anlzAllPageUrls(self, data, first_page): soup = BeautifulSoup(data, fromEncoding="gb18030") pagination = soup.find("div", {"id": "pagination"}) alinks = pagination.findAll("a")[1:-3] pages = [] pages.append(first_page) for alink in alinks: page = urljoin(first_page, alink["href"]) pages.append(page) return pages # 分析所有分页得到所有图片链接 # 该函数实现部分不必深究,具有页面特异性 def anlzAllImageUrls(self): pages = self.pages images = [] for page in pages: u = uopen(page) data = u.read() soup = BeautifulSoup(data, fromEncoding="gb18030") imglinks = soup.findAll("img", {"class": "IMG_show"}) for imglink in imglinks: image = imglink["src"] images.append(image) return images# 单元测试 onlyif __name__ == "__main__": lister = ImageListerA("**************************************.htm") print lister.getTitle() print lister.getInfo() print lister.getPages() print "\n".join(lister.getImages())
同样地,ImageListerB 根据抓取网页的不同,而重写 ImageLister 中的方法,这样,换一个网站,只需要新创建一个继承于 ImageLister 的 ImageListerXXX 类,实现适合于该网站的 HTML 解析算法即可。
其中,ulib.py 是我自己写的库,提供了带重试功能的 url 访问函数以及其他的一些常用的函数:
# -*- coding: utf-8 -*-# ulib.pyfrom urllib2 import urlopen, HTTPError, URLErrorfrom time import sleepdef uopen(url, verbose=True): retryTimes = 5 sleepTime = 10 while retryTimes > 0: try: if verbose: print u"正在读取:", url u = urlopen(url) # 读取成功 if u.code == 200: return u elif u.code == 201: break except HTTPError, e: print e if e.code == 404: break except URLError, e: print e except BaseException, e: print e retryTimes -= 1 if verbose: print u"读取失败,等待重试……" sleep(sleepTime) return Nonedef uclose(u): u.close()# 格式化文件大小# 如 10 => "10B", 1024 => "1KB"...def formatSize(size): if size > pow(1024, 2): new_size = size / pow(1024, 2) postfix = "MB" elif size > 1024: new_size = size / 1024 postfix = "KB" else: new_size = size postfix = "B" strsize = "%.2f" % new_size return strsize + postfix
二、ImageCatcher 类
ImageCatcher 类主要负责从 ImageLister 类得到图片 url,再将其存入本地(dirname = save_path + title),同时保存页面的地址(first_page)、标题(title)以及备注信息(info)。
ImageCatcher 类的实现代码如下:
# -*- coding: utf-8 -*-# imagecatcher.pyimport osfrom time import clockimport urlparsefrom imagelistera import ImageListerAfrom ulib import uopen, uclose, formatSizeIMAGE_URL_FILE = "image_urls.txt"IMAGE_INFO_FILE = "image_info.txt"class ImageCatcher(object): def __init__(self, save_path, image_lister): self.image_lister = image_lister self.save_path = save_path self.first_page = image_lister.getFirstPage() self.title = image_lister.getTitle() self.info = image_lister.getInfo() self.dirname = os.path.join(save_path, self.title); self.__createDir(self.dirname, verbose=False) #print self.first_page print self.title #print self.info #print self.dirname self.downAllImages() # 创建图片文件夹 def __createDir(self, dirname, verbose=True): if not os.path.exists(dirname): os.makedirs(dirname) if verbose: print u"已创建:%s" % dirname return True else: if verbose: print u"已存在:%s" % dirname return False # 下载所有图片 def downAllImages(self, verbose=True): filename = os.path.join(self.dirname, IMAGE_URL_FILE) # 通过文件静态获取 if os.path.exists(filename): if verbose: print u"已存在:%s" % filename images = self.__readImageUrls(filename, verbose) # 远程读取 url,并保存至文件 else: images = self.__saveImageUrls(filename, verbose) self.images = images imageNum = len(images) i = 0 for image in images: i += 1 if verbose: print "%d/%d" % (i, imageNum) print image self.__saveImage(image) # 保存信息文件 filename = os.path.join(self.dirname, IMAGE_INFO_FILE) self.__saveInfo(filename, verbose) # 通过文件静态获取图片 URL def __readImageUrls(self, filename, verbose=True): f = open(filename, "r") images = [] for line in f: images.append(line.rstrip("\n")) f.close() if verbose: print u"搜索到:%d 张" % len(images) return images # 远程读取图片 URL,并保存至文件 def __saveImageUrls(self, filename, verbose=True): images = self.image_lister.getImages() if verbose: print u"搜索到:%d 张" % len(images) f = open(filename, "w") for image in images: f.write(image) f.write("\n") f.close() if verbose: print u"已写入:%s" % filename return images def __saveImage(self, url, verbose=True): basename = url.split("/")[-1] dirname = self.dirname assert(os.path.exists(dirname)) filename = os.path.join(dirname, basename) file_size = 0 if os.path.exists(filename): print u"文件已存在:%s" % filename else: u = uopen(url) if u is None: return block_size = 8192 downloaded_size = 0 length = u.info().getheaders("Content-Length") if length: file_size = int(length[0]) print u"文件大小:%s" % formatSize(file_size) f = open(filename, "wb") print u"正在下载:%s" % url start = clock() try: while True: buffer = u.read(block_size) if not buffer: # EOF break downloaded_size += len(buffer); f.write(buffer) # 显示下载进度 if file_size: print "%2.1f%%\r" % (float(downloaded_size * 100) / file_size), else: print '...' except BaseException, e: print e f.close() if os.path.exists(filename): os.remove(filename) print u"已删除损坏的文件:%s", filename exit() finally: uclose(u) f.close() print u"文件已保存:%s" % os.path.abspath(filename) end = clock() spend = end - start print u"耗时:%.2f 秒" % spend print u"平均速度:%.2fKB/s" % (float(file_size) / 1024 / spend) # 保存信息文件 # 文件包括:url, title, info def __saveInfo(self, filename, verbose=True): if not os.path.exists(filename): info = self.image_lister.getInfo() f = open(filename, "w") f.write(self.first_page) f.write("\n") # 注意此处以将 Unicode 转换为 UTF-8 保存 if self.title: f.write(self.title.encode("utf-8")) f.write("\n") if self.info: f.write(self.info.encode("utf-8")) f.write("\n") if verbose: print u"已写入:%s" % filename return True else: if verbose: print u"已存在:%s" % filename return False # 单元测试 onlyif __name__ == "__main__": lister = ImageListerA("http://*************************************.htm") #lister = ImageListerB("http://*************************************.htm") ImageCatcher("pics", lister) # "pics" 为相对路径
三、ulib 库
ulib 是我自己实现 URL 处理库(多谢 的提醒),代码实现如下:
# -*- coding: utf-8 -*-import urllib2from urllib2 import Request, urlopen, HTTPError, URLErrorfrom time import sleepimport socketRETRY_TIMES = 5SLEEP_TIME = 10def uopen(url, headers={}, timeout=None, verbose=True): retryTimes = RETRY_TIMES sleepTime = SLEEP_TIME if headers: try: r = Request(url, headers=headers) u = urlopen(r) except HTTPError, e: print e print u"服务器已禁止断点续传" else: return u while retryTimes > 0: try: u = urlopen(url) if verbose: print u"正在连接:", url # 连接成功 if u.code == 200: return u elif u.code == 201: break except HTTPError, e: print e if e.code == 404: break except URLError, e: print e except socket.timeout, e: print u"连接超时,等待重试……" except KeyboardInterrupt, e: print u"用户强制中止" exit() except BaseException, e: print e retryTimes -= 1 #if verbose: # print u"读取失败,等待重试……" try: # 少量多次,见机中止 while sleepTime > 0: sleep(1) sleepTime -= 1 except KeyboardInterrupt, e: print u"用户强制中止" if retryTimes == 0: exit() return Nonedef uread(u): data = u.read() return datadef uclose(u): u.close()# 格式化文件大小# 如 10 => "10B", 1024 => "1.00KB"...def formatSize(size): if size > pow(1024, 2): new_size = size / pow(1024, 2) postfix = "MB" elif size > 1024: new_size = size / 1024 postfix = "KB" else: new_size = size postfix = "B" strsize = "%.2f" % new_size return strsize + postfixif __name__ == '__main__': # 单元测试 pass
注:在编写过程中,我一如既往地遇到了令人绝望的编码问题。我开发平台用的是 Windows 7,Python 2.7,折腾了好久,最后终于得到一个比较完美解决乱码问题的方法,即:
1. 在程序中无论何时都使用 unicode 处理字符串,因为 BeautifulSoup3 默认返回 unicode,我们要做的只是给自己的字符串前加一个 'u',然后尽情地使用 unicode 吧。
2. 可以直接将 unicode 输出到 IDLE 或 cmd.exe,系统会自动转换为 gbk 输出(前提是你系统的代码页是 cp936 或 gbk)。
4. 保存文本文件时,将 unicode 转换为 utf-8 存入,读取时,将 utf-8 转换为 unicode。
3. 至于 linux 下,我还没有试过,改天测试一下。
(未完待续)