import asyncio
import hmac
import hashlib
import time
import json
import websockets
import datetime
import requests
API_KEY = ""
SECRET_KEY = ""
PASSPHRASE = ""
WS_URL = 'wss://ws.okx.com:8443/ws/v5/private'
def get_okx_server_timestamp():
""" 获取 OKX 服务器时间(毫秒级) """
try:
response = requests.get('https://www.okx.com/api/v5/public/time', timeout=3)
response.raise_for_status()
server_time = int(response.json()['data'][0]['ts']) # 确保是整数
return str(server_time)
except Exception as e:
print(f"获取服务器时间失败: {str(e)}")
return str(int(time.time() * 1000)) # 仅限调试时使用
def generate_signature(secret, message):
""" 生成 HMAC-SHA256 签名 """
return hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).hexdigest()
def get_login_message():
""" 生成登录请求数据 """
server_ts = get_okx_server_timestamp() # 获取 OKX 服务器时间
print(f"[时间校验] 服务器时间戳: {server_ts} ({datetime.datetime.fromtimestamp(int(server_ts) / 1000, datetime.UTC)})")
# 生成签名
message = server_ts + "GET" + "/users/self/verify"
signature = generate_signature(SECRET_KEY, message)
return {
"op": "login",
"args": [{
"apiKey": API_KEY,
"passphrase": PASSPHRASE,
"timestamp": server_ts,
"sign": signature
}]
}
async def websocket_client():
""" 连接 WebSocket 并进行登录 """
try:
async with websockets.connect(WS_URL) as ws:
login_msg = json.dumps(get_login_message())
# 发送前稍作延迟,确保时间戳不过期
time.sleep(0.1)
await ws.send(login_msg)
print(f"已发送登录请求: {login_msg}")
# 获取并解析服务器响应
response = json.loads(await ws.recv())
print(f"登录响应: {response}")
if response.get('code') != '0':
print(f"登录失败: {response}")
return
print("✅ 登录成功!开始监听数据...")
# 监听 WebSocket 消息
while True:
msg = await ws.recv()
print(f"📩 收到消息: {msg}")
except websockets.exceptions.ConnectionClosed as e:
print(f"🔴 连接断开: code={e.code}, reason={e.reason}")
except Exception as e:
print(f"⚠️ 错误: {str(e)}")
if __name__ == "__main__":
print("=" * 40)
print("🔍 首次运行时间校验:")
print(f"⏰ 本地系统时间: {datetime.datetime.now()}")
print(f"🌍 UTC 时间: {datetime.datetime.now(datetime.UTC)}")
print("=" * 40)
asyncio.run(websocket_client())
server_time = get_okx_server_timestamp()
print(f"✅ OKX 服务器时间: {server_time} (毫秒)")
print(f"✅ 当前 UTC 时间: {int(time.time() * 1000)} (毫秒)")
我试了很多方法都是时间戳错误,甚至把电脑系统改为UTC时间,也不行