对接票通电子发票,票通已给返回响应!
票通返回值中 myres['content']字段又3DES 加密。求一个解密方法。
当前返回效果:
下图解密后效果
import json
import time
import random
import pyDes
import requests
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import MD5, SHA1, SHA256
import base64
from flask import current_app
import warnings
from Crypto.Cipher import DES3
import codecs
import base64
warnings.filterwarnings("ignore")
# 私钥(与发给票通的公钥为一对)
privateKey = 'MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAIVLAoolDaE7m5oMB1ZrILHkMXMF6qmC8I/FCejz4hwBcj59H3rbtcycBEmExOJTGwexFkNgRakhqM+3uP3VybWu1GBYNmqVzggWKKzThul9VPE3+OTMlxeG4H63RsCO1//J0MoUavXMMkL3txkZBO5EtTqek182eePOV8fC3ZxpAgMBAAECgYBp4Gg3BTGrZaa2mWFmspd41lK1E/kPBrRA7vltMfPj3P47RrYvp7/js/Xv0+d0AyFQXcjaYelTbCokPMJT1nJumb2A/Cqy3yGKX3Z6QibvByBlCKK29lZkw8WVRGFIzCIXhGKdqukXf8RyqfhInqHpZ9AoY2W60bbSP6EXj/rhNQJBAL76SmpQOrnCI8Xu75di0eXBN/bE9tKsf7AgMkpFRhaU8VLbvd27U9vRWqtu67RY3sOeRMh38JZBwAIS8tp5hgcCQQCyrOS6vfXIUxKoWyvGyMyhqoLsiAdnxBKHh8tMINo0ioCbU+jc2dgPDipL0ym5nhvg5fCXZC2rvkKUltLEqq4PAkAqBf9b932EpKCkjFgyUq9nRCYhaeP6JbUPN3Z5e1bZ3zpfBjV4ViE0zJOMB6NcEvYpy2jNR/8rwRoUGsFPq8//AkAklw18RJyJuqFugsUzPznQvad0IuNJV7jnsmJqo6ur6NUvef6NA7ugUalNv9+imINjChO8HRLRQfRGk6B0D/P3AkBt54UBMtFefOLXgUdilwLdCUSw4KpbuBPw+cyWlMjcXCkj4rHoeksekyBH1GrBJkLqDMRqtVQUubuFwSzBAtlc'
# 票通公钥(票通提供)
ptPublicKey = 'MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCJkx3HelhEm/U7jOCor29oHsIjCMSTyKbX5rpoAY8KDIs9mmr5Y9r+jvNJH8pK3u5gNnvleT6rQgJQW1mk0zHuPO00vy62tSA53fkSjtM+n0oC1Fkm4DRFd5qJgoP7uFQHR5OEffMjy2qIuxChY4Au0kq+6RruEgIttb7wUxy8TwIDAQAB'
# 3DES秘钥(票通提供)
password = 'lsBnINDxtct8HZB7KCMyhWSJ'
# 请更换请求平台简称(票通提供)
platform_alias = 'DEMO'
# 请更换请求平台编码(票通提供)
platform_code = '11111111'
#RSA签名方法
def RSA_sign(data,privateKey):
private_keyBytes = base64.b64decode(privateKey)
priKey = RSA.importKey(private_keyBytes)
signer = PKCS1_v1_5.new(priKey,)
hash_obj = SHA1.new(data.encode('utf-8'))
signature = base64.b64encode(signer.sign(hash_obj))
signature = signature.decode('utf8')
return signature
#生成请求流水号
def invoiceReqSerialNo():
return platform_alias + str(round(time.time() * 1000)) + str(random.randrange(100, 1000, 100))
#http请求
def Httppost(url, values):
headers = {
"Content-Type": "application/json; charset=UTF-8"
}
response = requests.post(url, data=json.dumps(values), headers=headers).text
return response
#公共报文组装
def assembly(content):
# 格式化成2016-03-20 11:45:39形式
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
serialNo = str(invoiceReqSerialNo()) + '_12345'
contentstr = str(DES_Encode(password, content), 'utf-8')#加密位置
wait_sign = 'content=' + str(
contentstr) + '&format=JSON&platformCode=' + platform_code + '&serialNo=' + serialNo + '&signType=RSA×tamp=' + timestamp + '&version=1.0'
sign = RSA_sign(str(wait_sign), privateKey)
postjson = {'platformCode': str(platform_code), 'signType': 'RSA', 'sign': str(sign), 'format': 'JSON',
'timestamp': str(timestamp), 'version': '1.0', 'serialNo': str(serialNo),
'content': contentstr}
json.dumps(postjson)
return postjson
#3des加密
def DES_Encode(keys,encrypt):
k = pyDes.triple_des(keys, pyDes.ECB, "\0\0\0\0\0\0\0\0", pad=None, padmode=pyDes.PAD_PKCS5)
d = k.encrypt(encrypt)
return base64.b64encode(d)
#3des解密
def DES_Decrypt(keys,encrypt):
k = pyDes.triple_des(keys, pyDes.ECB, "\0\0\0\0\0\0\0\0", pad=None, padmode=pyDes.PAD_PKCS5)
d = k.decrypt(encrypt)
# d = k.decrypt(binascii.a2b_hex(encrypt), padmode=pyDes.PAD_PKCS5)
return d
class EncryptDate:
def __init__(self, key):
self.key = key # 初始化密钥
self.length = DES3.block_size # 初始化数据块大小
self.aes = DES3.new(self.key, DES3.MODE_CBC, b'12345678') # 初始化AES,ECB模式的实例
# 截断函数,去除填充的字符
self.unpad = lambda date: date[0:-ord(date[-1])]
def pad(self, text):
"""
#填充函数,使被加密数据的字节码长度是block_size的整数倍
"""
count = len(text.encode('utf-8'))
add = self.length - (count % self.length)
entext = text + (chr(add) * add)
return entext
def encrypt(self, encrData): # 加密函数
res = self.aes.encrypt(self.pad(encrData).encode("utf8"))
# msg = str(base64.b64encode(res), encoding="utf8")
msg = res.hex()
return msg
def decrypt(self, decrData): # 解密函数
# res = base64.decodebytes(decrData.encode("utf8"))
res = bytes.fromhex(decrData)
msg = self.aes.decrypt(res).decode("utf8")
return self.unpad(msg)
if __name__ == '__main__':
url = "http://fpkj.testnw.vpiaotong.cn/tp/openapi/invoiceBlue.pt";
taxpayerNum='500102201007206608'#测试环境用此税号,正式环境用商家税号
itemmap1 = {'taxClassificationCode': '1010101020000000000', 'quantity': '1.00', 'goodsName': '货物名称1',
'unitPrice': '56.64', 'invoiceAmount': '56.64', 'taxRateValue': '0.16', 'includeTaxFlag': '0',
'zeroTaxFlag': None, 'preferentialPolicyFlag': None, 'vatSpecialManage': None}
itemmap2 = {'taxClassificationCode': '1010101020000000000', 'quantity': '1.00', 'goodsName': '货物名称2',
'unitPrice': '56.64', 'invoiceAmount': '56.64', 'taxRateValue': '0.16', 'includeTaxFlag': '0',
'zeroTaxFlag': None, 'preferentialPolicyFlag': None, 'vatSpecialManage': None}
itemlist = [itemmap1, itemmap2]
map = {'taxpayerNum': taxpayerNum, 'invoiceReqSerialNo': invoiceReqSerialNo(), 'buyerName': '购买方名称',
'buyerTaxpayerNum': 'XX0000000000000000', 'itemList': itemlist}
content = json.dumps(map)
post=assembly(content)
# print(json.dumps(post))
response=Httppost(url, post)
# print(response)
myres = json.loads(response)
print(myres['content'])