-
Notifications
You must be signed in to change notification settings - Fork 1
/
BinanceAPI.py
115 lines (95 loc) · 3.79 KB
/
BinanceAPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from urllib.parse import urlencode
import time
import hashlib
import requests
import hmac
class Binance:
BASE_URL = "https://www.binance.com/api/v1"
BASE_URL_V3 = "https://api.binance.us/api/v3"
PUBLIC_URL = "https://www.binance.com/exchange/public/product"
def __init__(self, key=None, secret=None):
self.key = key
self.secret = secret
def ping(self):
path = "%s/ping" % self.BASE_URL_V3
return requests.get(path, timeout=30, verify=True).json()
def get_history(self, symbol, limit=50):
path = "%s/historicalTrades" % self.BASE_URL
params = {"symbol": symbol, "limit": limit}
return self._get_no_sign(path, params)
def get_trades(self, market, limit=50):
path = "%s/trades" % self.BASE_URL
params = {"symbol": symbol, "limit": limit}
return self._get_no_sign(path, params)
def get_candles(self, symbol, interval, startTime, endTime):
# [0] Open time
# [1] Open
# [2] High
# [3] Low
# [4] Close
# [5] Volume
# [6] Close time
# [7] Quote asset volume
# [8] Number of trades
# [9] Taker buy base asset volume
# [10] Taker buy quote asset volume
# [11] Ignore.
path = "%s/klines" % self.BASE_URL_V3
params = {"symbol": market, "interval":interval, "startTime":startTime, "endTime":endTime}
return self._get_no_sign(path, params)
def get_ticker(self, market):
path = "%s/ticker/24hr" % self.BASE_URL
params = {"symbol": market}
return self._get_no_sign(path, params)
def get_order_book(self, symbol, limit=50):
path = "%s/depth" % self.BASE_URL
params = {"symbol": symbol, "limit": limit}
return self._get_no_sign(path, params)
def _get_no_sign(self, path, params={}):
query = urlencode(params)
url = "%s?%s" % (path, query)
return requests.get(url, timeout=30, verify=True).json()
def _sign(self, params={}):
data = params.copy()
ts = int(1000 * time.time())
data.update({"timestamp": ts})
h = urlencode(data)
b = bytearray()
b.extend(self.secret.encode())
signature = hmac.new(b, msg=h.encode('utf-8'), digestmod=hashlib.sha256).hexdigest()
data.update({"signature": signature})
return data
def _get(self, path, params={}):
params.update({"recvWindow": config.recv_window})
query = urlencode(self._sign(params))
url = "%s?%s" % (path, query)
return requests.get(url, headers=header, \
timeout=30, verify=True).json()
def _post(self, path, params={}):
params.update({"recvWindow": config.recv_window})
query = urlencode(self._sign(params))
url = "%s" % (path)
header = {"X-MBX-APIKEY": self.key}
return requests.post(url, headers=header, data=query, \
timeout=30, verify=True).json()
def _order(self, market, quantity, side, rate=None):
params = {}
if rate is not None:
params["type"] = "LIMIT"
params["price"] = self._format(rate)
params["timeInForce"] = "GTC"
else:
params["type"] = "MARKET"
params["symbol"] = market
params["side"] = side
params["quantity"] = '%.8f' % quantity
return params
def _delete(self, path, params={}):
params.update({"recvWindow": config.recv_window})
query = urlencode(self._sign(params))
url = "%s?%s" % (path, query)
header = {"X-MBX-APIKEY": self.key}
return requests.delete(url, headers=header, \
timeout=30, verify=True).json()
def _format(self, price):
return "{:.8f}".format(price)