-
Notifications
You must be signed in to change notification settings - Fork 2
/
db.py
138 lines (112 loc) · 3.96 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import asyncpg
import settings
from exchanges import exchange_apis
pool = None # asyncpg connection pool
async def create_tables():
async with pool.acquire() as conn:
await conn.fetch(
'''CREATE TABLE IF NOT EXISTS exchange(
id SERIAL PRIMARY KEY,
name VARCHAR,
url VARCHAR)'''
)
await conn.fetch(
'''CREATE TABLE IF NOT EXISTS subscription(
uid INTEGER NOT NULL,
exchange_id INTEGER REFERENCES exchange (id) NOT NULL,
api_key VARCHAR,
secret_key VARCHAR,
PRIMARY KEY (uid, exchange_id))'''
)
await conn.fetch(
'''CREATE TABLE IF NOT EXISTS user_order(
uid INTEGER NOT NULL,
exchange_id INTEGER REFERENCES exchange (id) NOT NULL,
order_id VARCHAR NOT NULL,
PRIMARY KEY (uid, exchange_id, order_id))'''
)
async def init_db():
global pool
pool = await asyncpg.create_pool(settings.DATABASE_URL)
await create_tables()
await insert_initial_values()
async def insert_initial_values():
exchanges = ((api.api_id, api.name, api.url) for api in exchange_apis)
async with pool.acquire() as conn:
await conn.executemany(
'''INSERT INTO exchange (id, name, url)
VALUES ($1, $2, $3) ON CONFLICT (id) DO NOTHING''',
exchanges
)
async def user_subscriptions(uid):
async with pool.acquire() as conn:
rows = await conn.fetch(
'''SELECT name
FROM subscription
JOIN exchange ON exchange.id = subscription.exchange_id
WHERE uid = $1 ''',
uid
)
return (row['name'] for row in rows) if rows else None
async def is_subscribed(uid: int, exchange_id: int) -> bool:
async with pool.acquire() as conn:
res = await conn.fetchrow(
'''SELECT COUNT(*) FROM subscription WHERE
uid = $1 AND
exchange_id = $2''',
uid,
exchange_id
)
return res['count'] > 0
async def subscribe(uid, exchange_id, api_key, secret_key):
async with pool.acquire() as conn:
await conn.fetch(
'''INSERT INTO subscription (uid, exchange_id, api_key, secret_key)
VALUES ($1, $2, $3, $4)
ON CONFLICT (uid, exchange_id) DO UPDATE SET
api_key = $3,
secret_key = $4''',
uid,
exchange_id,
api_key,
secret_key
)
async def unsubscribe(uid, exchange_id):
async with pool.acquire() as conn:
await conn.fetch(
'''DELETE FROM subscription WHERE uid = $1 AND exchange_id = $2''',
uid,
exchange_id
)
async def add_orders(orders):
async with pool.acquire() as conn:
await conn.executemany(
'''INSERT INTO user_order (uid, exchange_id, order_id)
VALUES ($1, $2, $3)
ON CONFLICT DO NOTHING''',
orders
)
async def get_order_ids(exchange_id, uid):
async with pool.acquire() as conn:
rows = await conn.fetch(
'''SELECT order_id FROM user_order WHERE exchange_id = $1 AND uid = $2''',
exchange_id,
uid
)
return {row['order_id'] for row in rows}
async def get_uids():
async with pool.acquire() as conn:
rows = await conn.fetch(
'''SELECT DISTINCT uid FROM subscription'''
)
return (row['uid'] for row in rows)
async def get_keys(uid, exchange_id):
async with pool.acquire() as conn:
row = await conn.fetchrow(
'''SELECT api_key, secret_key FROM subscription WHERE uid = $1 AND exchange_id = $2''',
uid,
exchange_id
)
if not row:
return None, None
return row['api_key'], row['secret_key']