失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 爬虫入门实战:斗鱼弹幕数据抓取 附送11节入门笔记

爬虫入门实战:斗鱼弹幕数据抓取 附送11节入门笔记

时间:2019-07-06 13:46:54

相关推荐

爬虫入门实战:斗鱼弹幕数据抓取 附送11节入门笔记

斗鱼弹幕

学习目标

掌握asyncore模块使用实现斗鱼弹幕数据抓取

预备知识

asyncore模块

介绍

这个模块为异步socket的服务器客户端通信提供简单的接口。该模块提供了异步socket服务客户端和服务器的基础架构。

相比python原生的socket api,asyncore具备有很大的优势,asyncore对原生的socket进行封装,提供非常简洁优秀的接口,利用asyncore覆写相关需要处理的接口方法,就可以完成一个socket的网络编程,从而不需要处理复杂的socket网络状况以及多线程处理等等。

实现流程

搭建 Socket 服务器环境

nc -l 9000

客户端 Socket 开发基本使用

定义类继承自asyncore.dispatcher

class SocketClient(asyncore.dispatcher):

实现类中的回调代码

实现构造函数

调用父类方法

asyncore.dispatcher.__init__(self)

创建 Socket 对象

self.create_socket()

连接服务器

address = (host,port)self.connect(address)

实现handle_connect回调函数

Socket连接服务器成功时回调该函数

def handle_connect(self):print("连接成功")

实现writable回调函数

描述是否有数据需要被发送到服务器。返回值为True表示可写,False表示不可写,如果不实现默认返回为True,当返回True时,回调函数handle_write将被触发

def writable(self):return True

实现handle_write回调函数

当有数据需要发送时(writable回调函数返回True时),该函数被触发,通常情况下在该函数中编写 send 方法发送数据

def handle_write(self):# 内部实现对服务器发送数据的代码# 调用 send 方法发送数据,参数是字节数据self.send('hello world\n'.encode('utf-8'))

实现readable回调函数

描述是否有数据从服务端读取。返回True表示有数据需要读取,False表示没有数据需要被读取,当不实现默认返回为True,当返回True时,回调函数handle_read将被触发

def readable(self):return True

实现handle_read回调函数

当有数据需要读取时触发(readable回调函数返回True时),该函数被触发,通常情况下在该函数中编写recv方法接收数据

def handle_read(self):# 主动接收数据,参数是需要接收数据的长度# 返回的数据是字节数据result = self.recv(1024)print(result)

实现handle_error回调函数

当程序运行过程发生异常时回调

def handle_error(self):# 编写处理错误方法t,e,trace = sys.exc_info()self.close()

实现handle_close回调函数

当连接被关闭时触发

def handle_close(self):print("连接关闭")self.close()

创建对象并且执行asyncore.loop进入运行循环

timeout表示一次循环所需要的时长

client = SocketClient('127.0.0.1',9000)# 开始启动运行循环asyncore.loop(timeout=5)

斗鱼弹幕实战

文档资料
斗鱼弹幕服务器第三方接入协议v1.6.2.pdf官方提供协议文档
弹幕客户端开发流程
连接初始化 使用TCP连接服务器 IP地址:端口:8601 客户端向弹幕服务器发送登录请求,登录弹幕服务器弹幕服务器收到客户端登录请求并完成登录后,返回登录成功消息给客户端客户端收到登录成功消息后发送进入弹幕分组请求给弹幕服务器弹幕服务器接受到客户端弹幕分组请求后将客户端添加到请求指定的弹幕分组中 服务过程 客户端每隔 45 秒发送心跳给弹幕服务器,弹幕服务器回复心跳信息给客户端弹幕服务器如有广播信息,则推送给客户端,服务器消息协议 断开连接 客户端发送登出消息客户端关闭 TCP 连接
数据发送和接收流程
数据包讲解
消息长度:4 字节小端整数,表示整条消息(包括自身)长度(字节数)。 消息长度出现两遍,二者相同。消息类型:2 字节小端整数,表示消息类型。取值如下: 689 客户端发送给弹幕服务器的文本格式数据690 弹幕服务器发送给客户端的文本格式数据。 加密字段:暂时未用,默认为 0。保留字段:暂时未用,默认为 0。数据部分:斗鱼独创序列化文本数据,结尾必须为‘\0’。详细序列化、反

序列化算法见下节。(所有协议内容均为 UTF-8 编码)

数据包封装

对数据包进行对象化封装,对数据的封装方便以后使用,实现对象和二进制数据之间的转换

通过参数构建数据包对象实现获取数据包长度的方法实现获取二进制数据的方法
实现发送数据包

构建发送数据包的容器

self.send_queue = Queue()

实现回调函数,判断容器中有数据就发送没有数据不发送

def writable(self):return self.send_queue.qsize() > 0def handle_write(self):# 从发送数据包队列中获取数据包对象dp = self.send_queue.get()# 获取数据包的长度,并且发送给服务器dp_length = dp.get_length()dp_length_data = dp_length.to_bytes(4,byteorder='little',signed=False)self.send(dp_length_data)# 发送数据包二进制数据self.send(dp.get_bytes())self.send_queue.task_done()pass

实现登录函数

构建登录数据包

content = "type@=loginreq/roomid@={}/".format(room_id)login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content=content)

把数据包添加到发送数据包容器中

# 把数据包添加到发送数据包容器中self.send_queue.put(login_dp)

实现接收数据

构建接收数据包队列

# 存放接收的数据包对象self.recv_queue = Queue()

读取回调函数中读取数据

读取长度

# 读取长度,二进制数据data_length_data = self.recv(4)# 通过二进制获取length 具体数据data_length = int.from_bytes(data_length_data,byteorder='little',signed=False)

读取内容

# 通过数据包的长度获取数据data = self.recv(data_length)

构建数据包对象

数据包构造函数中解析二进制来构建数据包对象

self.type = int.from_bytes(data_bytes[4:6],byteorder='little',signed=False)self.encrypt_flag = int.from_bytes(data_bytes[6:7],byteorder='little',signed=False)self.preserve_flag = int.from_bytes(data_bytes[7:8],byteorder='little',signed=False)# 构建数据部分self.content = str(data_bytes[8:-1],encoding='utf-8')

通过二进制数据构建数据包对象

# 通过二进制数据构建数据包对象dp = DataPacket(data_bytes=data)

把数据包放入接收数据包容器中

# 把数据包放入接收数据包容器中self.recv_queue.put(dp)

构建处理线程专门处理接收数据包容器中数据

构建线程

# 构建一个专门处理接收数据包容器中的数据包的线程self.callback_thread = threading.Thread(target=self.do_callback)self.callback_thread.setDaemon(True)self.callback_thread.start()

实现回调函数处理接收的数据包

def do_callback(self):'''专门负责处理接收数据包容器中的数据:return: '''while True:# 从接收数据包容器中获取数据包dp = self.recv_queue.get()# 对数据进行处理print(dp.content)pass

实现外部传入回调函数

通过外部指定回调函数实现自定义数据处理

添加参数callback

构造函数中添加参数

def __init__(self,host,port,callback=None):# 定义外部传入的自定义回调函数self.callback = callback

外部传入自定义回调函数

def data_callback(dp):'''自定义回调函数:param dp: 数据包对象:return: '''print("data_callback:",dp.content)passif __name__ == '__main__':client = DouyuClient('',8601,callback=data_callback)client.login_room_id(4494106)asyncore.loop(timeout=10)

在处理接收数据包的线程中调用回调函数

def do_callback(self):'''专门负责处理接收数据包容器中的数据:return: '''while True:# 从接收数据包容器中获取数据包dp = self.recv_queue.get()# 对数据进行处理if self.callback is not None:self.callback(dp)self.recv_queue.task_done()

数据内容序列化与反序列化
键 key 和值 value 直接采用‘@=’分割数组采用‘/’分割如果 key 或者 value 中含有字符‘/’,则使用‘@S’转义如果 key 或者 value 中含有字符‘@’,使用‘@A’转义

例子

多个键值对数据:key1@=value1/key2@=value2/key3@=value3/

数组数据:value1/value2/value3/

登录

参看斗鱼弹幕文档

加入弹幕分组

参看斗鱼弹幕文档,-9999为海量弹幕

心跳机制

作用是让服务器解决假死连接问题,客户端必须每隔45秒发送一次请求,否则就会被主动断开。

实现发送心跳函数 构建心跳数据包把数据包添加到发送数据包容器队列中 构建心跳线程 构建心跳线程添加触发机制添加暂停机制

代码实现

#!/usr/bin/python3# -*- coding: utf-8 -*-import asyncoreimport sysimport threadingimport timefrom queue import QueueDATA_PACKET_TYPE_SEND = 689DATA_PACKET_TYPE_RECV = 690def encode_content(content):'''序列化函数:param content: 需要序列化的内容 :return: '''if isinstance(content,str):return content.replace(r'@',r'@A').replace(r'/',r'@S')elif isinstance(content,dict):return r'/'.join(["{}@={}".format(encode_content(k),encode_content(v)) for k,v in content.items()]) + r'/'elif isinstance(content,list):return r'/'.join([encode_content(data) for data in content]) + r'/'return ""def decode_to_str(content):'''反序列化字符串:param content:字符串数据 :return: '''if isinstance(content,str):return content.replace(r'@S',r'/').replace('@A',r'@')return ""def decode_to_dict(content):'''反序列化字典数据:param content: 被序列化的字符串:return: '''ret_dict = dict()if isinstance(content,str):item_strings = content.split(r'/')for item_string in item_strings:k_v_list = item_string.split(r'@=')if k_v_list is not None and len(k_v_list) > 1:k = k_v_list[0]v = k_v_list[1]ret_dict[decode_to_str(k)] = decode_to_str(v)return ret_dictdef decode_to_list(content):'''反序列化列表数据:param content: 被序列化的字符串:return: '''ret_list = []if isinstance(content,str):items = content.split(r'/')for idx,item in enumerate(items):if idx < len(items) - 1:ret_list.append(decode_to_str(item))return ret_listclass DataPacket():def __init__(self,type=DATA_PACKET_TYPE_SEND,content="",data_bytes=None):if data_bytes is None:# 数据包的类型self.type = type# 数据部分内容self.content = contentself.encrypt_flag = 0self.preserve_flag = 0else:self.type = int.from_bytes(data_bytes[4:6],byteorder='little',signed=False)self.encrypt_flag = int.from_bytes(data_bytes[6:7],byteorder='little',signed=False)self.preserve_flag = int.from_bytes(data_bytes[7:8],byteorder='little',signed=False)# 构建数据部分self.content = str(data_bytes[8:-1],encoding='utf-8')def get_length(self):'''获取当前数据包的长度,为以后需要发送数据包做准备:return: '''return 4 + 2 + 1 + 1 + len(self.content.encode('utf-8')) + 1def get_bytes(self):'''通过数据包转换成 二进制数据类型:return: '''data = bytes()# 构建 4 个字节的消息长度数据data_packet_length = self.get_length()# to_bytes 把一个整型数据转换成二进制数据# 第一个参数 表示需要转换的二进制数据占几个字节# byteorder 第二个参数 描述字节序# signed 设置是否有符号# 处理消息长度data += data_packet_length.to_bytes(4,byteorder='little',signed=False)# 处理消息类型data += self.type.to_bytes(2,byteorder='little',signed=False)# 处理加密字段data += self.encrypt_flag.to_bytes(1,byteorder='little',signed=False)# 处理保留字段data += self.preserve_flag.to_bytes(1,byteorder='little',signed=False)# 处理数据内容data += self.content.encode('utf-8')# 添加 \0 数据data += b'\0'return dataclass DouyuClient(asyncore.dispatcher):def __init__(self,host,port,callback=None):# 构建发送数据包的队列容器# 存放了数据包对象self.send_queue = Queue()# 存放接收的数据包对象self.recv_queue = Queue()# 定义外部传入的自定义回调函数self.callback = callbackasyncore.dispatcher.__init__(self)self.create_socket()address = (host,port)self.connect(address)# 构建一个专门处理接收数据包容器中的数据包的线程self.callback_thread = threading.Thread(target=self.do_callback)self.callback_thread.setDaemon(True)self.callback_thread.start()# 构建心跳线程self.heart_thread = threading.Thread(target=self.do_ping)self.heart_thread.setDaemon(True)self.ping_runing = Falsepassdef handle_connect(self):print("连接成功")self.start_ping()def writable(self):return self.send_queue.qsize() > 0def handle_write(self):# 从发送数据包队列中获取数据包对象dp = self.send_queue.get()# 获取数据包的长度,并且发送给服务器dp_length = dp.get_length()dp_length_data = dp_length.to_bytes(4,byteorder='little',signed=False)self.send(dp_length_data)# 发送数据包二进制数据self.send(dp.get_bytes())self.send_queue.task_done()passdef readable(self):return Truedef handle_read(self):# 读取长度,二进制数据data_length_data = self.recv(4)# 通过二进制获取length 具体数据data_length = int.from_bytes(data_length_data,byteorder='little',signed=False)# 通过数据包的长度获取数据data = self.recv(data_length)# 通过二进制数据构建数据包对象dp = DataPacket(data_bytes=data)# 把数据包放入接收数据包容器中self.recv_queue.put(dp)def handle_error(self):t, e, trace = sys.exc_info()print(e)self.close()def handle_close(self):self.stop_ping()print("连接关闭")self.close()def login_room_id(self,room_id):self.room_id = room_idsend_data = {"type":"loginreq","roomid":str(room_id)}# 构建登录数据包content = encode_content(send_data)login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content=content)# 把数据包添加到发送数据包容器中self.send_queue.put(login_dp)def join_room_group(self):'''加入弹幕分组:return: '''send_data = {"type":"joingroup","rid":str(self.room_id),"gid":'-9999'}content = encode_content(send_data)dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)self.send_queue.put(dp)passdef send_heart_data_packet(self):send_data = {"type":"mrkl"}content = encode_content(send_data)dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)self.send_queue.put(dp)def start_ping(self):'''开启心跳:return: '''self.ping_runing = Truedef stop_ping(self):'''结束心跳:return: '''self.ping_runing = Falsedef do_ping(self):'''执行心跳:return: '''while True:if self.ping_runing:self.send_heart_data_packet()time.sleep(40)def do_callback(self):'''专门负责处理接收数据包容器中的数据:return: '''while True:# 从接收数据包容器中获取数据包dp = self.recv_queue.get()# 对数据进行处理if self.callback is not None:self.callback(self,dp)self.recv_queue.task_done()passdef data_callback(client,dp):'''自定义回调函数:param dp: 数据包对象:return: '''resp_data = decode_to_dict(dp.content)if resp_data["type"] == "loginres":#调用加入分组请求print("登录成功:",resp_data)client.join_room_group()elif resp_data["type"] == "chatmsg":print("{}:{}".format(resp_data["nn"],resp_data["txt"]))elif resp_data["type"] == 'onlinegift':print("暴击鱼丸")elif resp_data["type"] == "uenter":print("{} 进入了房间".format(resp_data["nn"]))passif __name__ == '__main__':client = DouyuClient('',8601,callback=data_callback)client.login_room_id(4494106)asyncore.loop(timeout=10)

如果觉得《爬虫入门实战:斗鱼弹幕数据抓取 附送11节入门笔记》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。