使用Python实现自己的小说追更公众号教程

一、前言

和很多男生一样,我一直比较喜欢看小说,而且喜欢的大多起点上面那些没什么营养的玄幻小说(手动捂脸)。喜欢看小说的应该都知道,看完本的话基本会陷入除了吃饭睡觉,会“加班加点”地将小说一口气追完的状态,这样完本的小说基本没几天就会看完。这时候便会不得不看日更的小说,追更的痛苦就是我每天都会多次打开小说APP,查看小说有没有更新(人肉轮询),浪费很多时间不说,没更新也难免失望。当然了土豪可以使用更新短信提醒,一毛钱一条哦。 作为一个有(mei)追(you)求(qian)的programer,就想着能不能自己动手,做一个小说追更程序,实时的将更新的小说通过微信消息推送到手机上,这样就不用一次次手动的打开APP去查看小说有没有更新。于是,就有了这篇文章…

二、最终效果

好了,本文采用倒序手法,首先呈现的是程序的最终效果。 当小说有更新时,你的手机上会收到下面这样的通知(还支持勿扰模式哦): example 点击详情可以直接阅读最新章节(支持正版请禁用该功能并到官方APP中阅读正版,仅保留通知功能): view

三、实现架构

程序主要包含三个关键组件:

  • 小说更新爬虫程序:定时循环爬取小说源站,监控小说更新。
  • 微信token服务器:统一获取并保存微信公众号的access token,供微信公众号程序使用。
  • 微信公众号服务程序:接收爬虫程序的更新章节推送,并将更新通过微信消息发送给用户。

流程图: architectures

四、组件详解

1. 小说更新爬虫

github源码地址:https://github.com/shangyexin/get-latest-novel

①功能简介

该模块的功能主要是循环监控小说网站指定小说的更新,如果更新的话将更新的小说信息post到指定的url,另一个微信公众号后台程序会监听该指定url,将更新的小说信息通过微信发送给用户。 监控的小说源站是一个稍大的小说采集站,因为这类网站大多采用相同的模板,网页结构较为简单,而且基本上没有反爬措施。

②流程图

下面是一个简单的流程图描述: get-latest-novel

③核心源码

主要使用的就是Python的requestsBeautifulSoup库,解析小说的最新章节是否有变化。 部分实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def getFromWebsite(bookName, url):
bookInfo = {'bookName': '', 'latestChapter': '', 'updateTime': '', 'latestUrl': ''}
novel = requests.get(url, timeout=60)
novel.encoding = "gbk"
soup = BeautifulSoup(novel.text, "html.parser")
last = soup.find_all(attrs={'class': 'last'})
# 更新时间在第一个last
updateInfo = last[0].contents[0]
updateTime = re.search('\d.*$', updateInfo).group(0)
# print('updateTime is ' + updateTime)
# 章节在第二个last
lastText = last[1].contents[1]
# 最新章节网址
latestUrl = lastText.get('href')
# 完整网址
completeUrl = config.baseOriginUrl + latestUrl
# 最新章节名
latestChapter = lastText.get_text()
...
④配置文件

配置文件其实就是一个json文件,包含了小说名称和在小说网站的url。
一个典型的模板:

1
2
3
4
5
6
7
8
9
10
{
"shengxu": {
"chineseName": "圣墟",
"bookUrl": "https://www.biqiuge.com/book/4772/"
},
"feijianwendao": {
"chineseName": "飞剑问道",
"bookUrl": "https://www.biqiuge.com/book/24277/"
}
}
⑤最终提交的post信息
1
2
3
4
5
6
{
"bookName" : "圣墟",
"latestChapter" : "第1291章 阳间风云激荡",
"updateTime" : "2018-11-06 00:06:26",
"latestUrl":"https://www.biqiuge.com/book/4772/26795186.html"
}

2. Wechat Token Server

github源码地址:https://github.com/shangyexin/wechat-token-server 微信token中控服务器,用于统一获取并缓存微信开发中使用的access_token和jsticket。 这么做主要是微信的官方建议:

公众号开发者应使用中控服务器统一获取和刷新Access_token,其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则容易造成冲突,导致access_token覆盖而影响业务;

因为文字篇幅所限,关于微信token中控服务器的实现我单独写了一篇文章,大家可以点击下面的链接阅读: 使用python中tornado框架实现的微信access_token中控服务器

3. 微信公众号

github源码地址:https://github.com/shangyexin/novel-update-monitor-account 因为只有服务号才能使用模板消息功能,所以我们选择“开箱即用”的微信公众平台测试账号。 点击下面的链接,无需注册即可登录使用: https://mp.weixin.qq.com/debug/cgi-bin/sandbox?t=sandbox/login

①接口配置

关于微信公众平台的接口配置详细步骤大家可以参阅微信官方文档。 这里只讲述一下操作步骤,首先填入公众号服务器响应的的URL和使用的Token: configure 其中URL中的IP地址是你的服务器IP地址,后面的/api/wechat是你的公众号中响应微信Token验证的路径。可以使用Nginx反向代理到Tornado的运行端口(在config.py中配置)。下面是Nginx配置示例:

1
2
3
4
5
6
7
8
server {
listen 80;
server_name 35.234.33.8;

location ~ ^/api/wechat {
proxy_pass http://127.0.0.1:12126;
}
}

Token填入你在config.py中配置token即可。 下面是对应的Python代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 校验微信服务器签名
class VerifyWechatSignHandler(tornado.web.RequestHandler):
def get(self):
signature = self.get_argument('signature', 'UnKnown')
timestamp = self.get_argument('timestamp', 'UnKnown')
nonce = self.get_argument('nonce', 'UnKnown')
echostr = self.get_argument('echostr', 'UnKnown')
if (signature == 'UnKnown' or timestamp == 'UnKnown' or nonce == 'UnKnown' or echostr ==
'UnKnown'):
logger.error('Not enough parameters.')
self.write("Invalid request : not enough parameters")
else:
try:
# 排序
params = [config.token, timestamp, nonce]
params.sort()
# sha1加密
hash_sha1 = hashlib.sha1(''.join(params).encode('utf-8'))
sign = hash_sha1.hexdigest()
# 对比
if (signature == sign):
logger.info('Correct wechat signature.')
self.write(echostr)
else:
logger.error('Wrong wechat signature.')
self.write("Invalid request : wrong wechat signature")
except Exception as e:
logger.error(e)

def makeApp():
return tornado.web.Application([
(r"/api/wechat", VerifyWechatSignHandler)
])
②关注测试账号,获得userid

配置完接口后,我们需要关注测试账号,获得我们的userid,将userid填入到config.py中.,具体对应的是下面的touser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
notice = {
"touser": "oQHU46Djs5O3yhsTmYGvDz_Hi0vo",
"template_id": "oKa0UsZ6xvSlnFChlGGdMMH1O_yq2l91G-sIQPRg2BI",
"url": "",
"topcolor": "#FF0000",
"data": {
"first": {
"value": "您订阅的小说更新啦!",
"color": "#173177"
},
"novelName": {
"value": "",
"color": "#173177"
},
"sectionName": {
"value": "",
"color": "#173177"
},
"updateTime": {
"value": "",
"color": "#173177"
},
"remark": {
"value": "点击详情立刻阅读最新章节↓↓↓",
"color": "#173177"
}
}
③新增测试模板

点击新增测试模板,输入模板标题和模板内容即可创建,需要注意的是模板需符合微信规定语法,我添加的与上面配置中对应的测试模板如下: {{first.DATA}} 作品名称:{{novelName.DATA}} 最新章节:{{sectionName.DATA}} 更新时间:{{updateTime.DATA}} {{remark.DATA}} 模板添加完成后会生成模板ID,填入config.py中的template_id

④接收更新post并发送给用户

截止到当前步骤,所有的配置已经完成,我们只需要接收指定URL的post消息,解析完成后,填入模板中发送给用户即可。 接收消息的URL同样可以在Nginx中配置:

1
2
3
4
5
6
7
8
server {
listen 80;
server_name 35.234.33.8;

location ~ ^/api/novelupdate {
proxy_pass http://127.0.0.1:12126;
}
}

接收并解析更新post的Python代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 接收小说更新推送通知
class NovelUpdateHandler(tornado.web.RequestHandler):
def post(self):
try:
bookName = self.get_argument('bookName', 'UnKnown')
latestChapter = self.get_argument('latestChapter', 'UnKnown')
updateTime = self.get_argument('updateTime', 'UnKnown')
latestUrl = self.get_argument('latestUrl', 'UnKnown')

if (bookName == 'UnKnown' or latestChapter == 'UnKnown'
or updateTime == 'UnKnown' or latestUrl == 'UnKnown'):
logger.error('Wrong wechat signature.')
self.write("Invalid request : wrong novel update post info")
else:
config.notice['data']['novelName']['value'] = bookName
config.notice['data']['sectionName']['value'] = latestChapter
config.notice['data']['updateTime']['value'] = updateTime
config.notice['url'] = latestUrl
# print(config.notice)
putIntoQueue(config.notice)
self.write("success")
except Exception as e:
logger.error(e)

给用户发送消息的Python代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 通知用户
def notifyUser(data):
accessToken = getAccessToken()
if accessToken is not None:
notifyUrl = config.baseNotifyUrl + accessToken
jsonData = json.dumps(data)
# print(notifyUrl)
try:
request = tornado.httpclient.HTTPRequest(notifyUrl, method='POST', body=jsonData)
syncHttpClient = tornado.httpclient.HTTPClient()
response = syncHttpClient.fetch(request)
except Exception as e:
logger.error(e)
logger.error('Notify wechat user failed.')
else:
logger.info('Notify wechat user success, response is %s.', response.body)
syncHttpClient.close()
else:
config.notificationQueue.append(data)

五、运行与部署

1. Tmux

这里给大家推荐一款我一直在用的Linux终端复用神器,相信很多人也使用过,那就是Tmux。 Tmux可用于在一个终端窗口中运行多个终端会话。不仅如此,还可以通过 Tmux 使终端会话运行于后台或是按需接入、断开会话,这个功能非常实用。这样就可以不用使用nohup&在后台运行Python程序。 ubuntu版本下直接apt-get安装:sudo apt-get install tmux centos7版本下直接yum安装:yum install -y tmux

2. 程序运行

Tmux安装完成后,我们就可以更简单地运行我们的代码了! 运行步骤: 1. git clone源码。 2. 使用pip3 install -r requirements.txt安装依赖项 3. 修改config.py进行相关配置。 4. 使用tmux new -s name_by_you创建一个新的会话 5. 输入命令python3 main.py,注意需要使用的为Python的3.x版本,不支持2.0版本。 6. 按下tmux快捷键ctr + b,再按下键盘d键,即可实现程序在后台运行。 完成上面所有步骤后你还不放心的话,可以关闭终端后再次进入,使用ps -axu grep python3查看是否还有你的python进程。 另外在终端使用命令tmux a -t name_by_you可以恢复在后台运行的tmux会话。

3. 部署建议

由实现架构图可以看出,爬虫程序、微信token服务器、微信公众号服务程序其实是互相独立的,他们通过http协议进行交互,所以它们可以独立的部署在不同的服务器上。 我是将微信token服务器、微信公众号服务程序部署在阿里云的服务器上,这样稳定性可以保证。 爬虫程序则部署在了免费试用一年的GCP(Google Cloud Platform)服务器上(亚马逊的AWS也可以免费试用一年),主要是防止爬虫程序被小说源站封IP,GCP和AWS都使用的弹性IP,IP如果被封的话可以重启服务器即可获得新的IP。

六、写在最后

由于水平和精力有限(程序都是下班后在家利用休息时间现学现卖写的,而且最近家里刚添了小宝宝),我知道代码肯定存在很多bug以及可以优化的地方(大神轻喷),但是已经可以达到当初我所期望的功能了,我已经稳定使用一个多月了,期间只因为Wechat Token Server的一个bug(暂未修复)宕机过一次,最后还是万能的重启程序解决了。 最后我想说的是,写代码实现自己的需求,真的是一件很幸福的事。 Just do IT。


使用Python实现自己的小说追更公众号教程
https://www.shangyexin.com/2019/01/21/novel-monitor/
作者
Yasin
发布于
2019年1月21日
许可协议