12306抢票脚本 python
时间: 2023-09-22 20:15:20 浏览: 106
抢票脚本的编写需要考虑很多因素,包括网络请求、验证码识别、多线程并发等等。以下是一个简单的抢票脚本示例,仅供参考:
```python
import requests
import time
import json
import re
import threading
class TrainTicket(object):
def __init__(self, train_date, from_station, to_station, purpose_codes, passengers, interval=1):
self.train_date = train_date
self.from_station = from_station
self.to_station = to_station
self.purpose_codes = purpose_codes
self.passengers = passengers
self.interval = interval
self.session = requests.Session()
self.headers = {xxxxxx}
def login(self):
url = 'https://kyfw.12306.cn/passport/web/login'
data = {xxxxxx}
response = self.session.post(url, headers=self.headers, data=data)
result = json.loads(response.text)
if result.get('result_code') == 0:
print('登录成功')
return True
else:
print('登录失败')
return False
def get_train_info(self):
url = 'https://kyfw.12306.cn/otn/leftTicket/queryZ?leftTicketDTO.train_date={}&leftTicketDTO.from_station={}&leftTicketDTO.to_station={}&purpose_codes={}'.format(
self.train_date, self.from_station, self.to_station, self.purpose_codes)
response = self.session.get(url, headers=self.headers)
result = json.loads(response.text)
if result.get('httpstatus') == 200 and result.get('status') == True:
data = result.get('data', {})
return data.get('result', [])
else:
print('查询车票信息失败')
return []
def order_ticket(self, ticket):
url = 'https://kyfw.12306.cn/otn/leftTicket/submitOrderRequest'
data = {xxxxxx}
response = self.session.post(url, headers=self.headers, data=data)
result = json.loads(response.text)
if result.get('httpstatus') == 200 and result.get('status') == True:
print('提交订单成功')
return True
else:
print('提交订单失败')
return False
def get_token(self):
url = 'https://kyfw.12306.cn/otn/confirmPassenger/initDc'
response = self.session.get(url, headers=self.headers)
result = re.findall(r"globalRepeatSubmitToken = '(.*?)'", response.text)
if result:
return result[0]
else:
print('获取 token 失败')
return ''
def get_passenger_info(self):
url = 'https://kyfw.12306.cn/otn/confirmPassenger/getPassengerDTOs'
data = {xxxxxx}
response = self.session.post(url, headers=self.headers, data=data)
result = json.loads(response.text)
if result.get('httpstatus') == 200 and result.get('status') == True:
data = result.get('data', {})
return data.get('normal_passengers', [])
else:
print('获取乘客信息失败')
return []
def check_order_info(self):
url = 'https://kyfw.12306.cn/otn/confirmPassenger/checkOrderInfo'
data = {xxxxxx}
response = self.session.post(url, headers=self.headers, data=data)
result = json.loads(response.text)
if result.get('httpstatus') == 200 and result.get('status') == True:
data = result.get('data', {})
if data.get('submitStatus') == True:
print('订单信息检查成功')
return True
else:
print('订单信息检查失败')
return False
else:
print('订单信息检查失败')
return False
def get_queue_count(self):
url = 'https://kyfw.12306.cn/otn/confirmPassenger/getQueueCount'
data = {xxxxxx}
response = self.session.post(url, headers=self.headers, data=data)
result = json.loads(response.text)
if result.get('httpstatus') == 200 and result.get('status') == True:
data = result.get('data', {})
return data.get('ticket', '0'), data.get('count', '0')
else:
print('查询排队信息失败')
return '0', '0'
def confirm_order(self):
url = 'https://kyfw.12306.cn/otn/confirmPassenger/confirmSingleForQueue'
data = {xxxxxx}
response = self.session.post(url, headers=self.headers, data=data)
result = json.loads(response.text)
if result.get('httpstatus') == 200 and result.get('status') == True:
data = result.get('data', {})
if data.get('submitStatus') == True:
print('下单成功')
return True
else:
print('下单失败')
return False
else:
print('下单失败')
return False
def run(self):
if not self.login():
return
while True:
train_info = self.get_train_info()
for train in train_info:
if train.get('queryLeftNewDTO', {}).get('canWebBuy') == 'Y':
train_date = train.get('queryLeftNewDTO', {}).get('start_train_date')
train_no = train.get('queryLeftNewDTO', {}).get('train_no')
from_station_no = train.get('queryLeftNewDTO', {}).get('from_station_no')
to_station_no = train.get('queryLeftNewDTO', {}).get('to_station_no')
seat_types = train.get('queryLeftNewDTO', {}).get('seat_types')
secret_str = train.get('secretStr', '')
if not secret_str:
continue
for passenger in self.passengers:
if passenger.get('passenger_name') not in train.get('queryLeftNewDTO', {}).get('lishi').split(' '):
continue
print('开始抢票:{} {}->{}'.format(train_date, self.from_station, self.to_station))
if not self.order_ticket(train):
continue
token = self.get_token()
if not token:
continue
passenger_ticket_str = '{},0,{},{},{},N'.format(seat_types.split(',')[0], passenger.get('passenger_name'), passenger.get('passenger_id_no'), passenger.get('passenger_type'))
old_passenger_str = '{},{},{},1_'.format(passenger.get('passenger_name'), passenger.get('passenger_id_no'), passenger.get('passenger_type'))
data = {xxxxxx}
if not self.check_order_info():
continue
ticket_count, queue_count = self.get_queue_count()
if int(ticket_count) == int(queue_count) == 0:
print('排队人数:{},当前无票'.format(queue_count))
continue
if not self.confirm_order():
continue
return
time.sleep(self.interval)
if __name__ == '__main__':
train_date = '2022-01-01'
from_station = 'BJP'
to_station = 'SHH'
purpose_codes = 'ADULT'
passengers = [
{
'passenger_name': '张三',
'passenger_id_no': 'xxxxxxxxxxxxxxx',
'passenger_type': '1',
},
{
'passenger_name': '李四',
'passenger_id_no': 'xxxxxxxxxxxxxxx',
'passenger_type': '1',
},
]
ticket = TrainTicket(train_date, from_station, to_station, purpose_codes, passengers)
ticket.run()
```
需要注意的是,这个脚本仅供学习和参考使用,不可用于商业用途和非法用途。在使用抢票脚本时,请遵守相关法律法规和12306网站的规定。
阅读全文