-
Notifications
You must be signed in to change notification settings - Fork 0
/
write_temp_to_db.py
128 lines (93 loc) · 4.15 KB
/
write_temp_to_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/python
import time
from gpiozero import MCP3008
from datetime import datetime as dt
import numpy as np
from my_functions import get_last_smoke_session_id, get_connection, hit_db, read_data
# there are 4 positions in the ADC because I have 4 temp probes ports.
def get_adc_value():
for pos in range(0,4):
if pos not in adcs:
adcs[pos] = MCP3008(pos)
vals = np.array([adcs[pos].value for pos in range(0,4)])
vals[vals > .985] = None
return vals
def get_temp_percent():
x = np.zeros((4,5))
for i in range(0, 5):
x[:,i] = get_adc_value()
time.sleep(.2)
return np.average(x,axis = 1)
def get_resistance(temp_percent, r=46000):
return r*temp_percent/(1-temp_percent)
#Steinhart–Hart equation converts resistance to temperature
def get_temp(R, thermistor_version):
if thermistor_version == 'old':
A = 0.6872188391*10**-3
B = 2.103627383*10**-4
C = 0.5449073998*10**-7
elif thermistor_version == 'new':
A = 1.216527419*10**-3
B = 1.337979364*10**-4
C = 3.947291334*10**-7
T = (A +B*np.log(R)+C*(np.log(R)**3) )**-1
T = T*(9/5) - 459.67
return(T)
def write_temp(temp, smoke_session_id, connection):
local_time = dt.now().strftime('%Y-%m-%d %H:%M:%S')
sql = """insert into recorded_data (smoke_session_id, date_time, temp0, temp1, temp2, temp3)
values ({}, '{}', {}, {}, {}, {} )
""".format(smoke_session_id, local_time, temp[0], temp[1], temp[2], temp[3] )
hit_db(sql, connection)
########################################################################################
def write_new_ss(new_ss, connection):
sql = """insert into smoke_session (date_time, meat_type, kilos, notes)
values (now(), '{}', '{}', '{}');
""".format(new_ss['meat_type'], new_ss['kilos'], new_ss['notes'])
return hit_db(sql, connection)
def check_smoke_session(connection):
smoke_session_id = get_last_smoke_session_id(connection)
if smoke_session_id == None:
first_smoke = input("Is this your first smoke in this db? (y/n): ")
if first_smoke == 'y':
continue_ss = 'n'
else:
raise ValueError('I cant find any earlier smoke sessions. Try debugging.')
else:
last_ss = read_data('smoke_session', smoke_session_id, connection)
print('Last Smoke Session is:')
print('ID: {}'.format(last_ss['smoke_session_id']))
print('Start: {}'.format(last_ss['date_time']))
print('Meat Type: {}'.format(last_ss['meat_type']))
print('kilos: {}'.format(last_ss['kilos']))
print('notes: {}'.format(last_ss['notes']))
print('')
continue_ss = input("Do you want to continue this smoke session? (y/n): ")
if continue_ss== 'n':
new_ss = {}
new_ss['meat_type'] = input('Enter meat type: ')
new_ss['kilos'] = float(input('Enter number kilos: '))
new_ss['notes'] = input('Enter notes: ')
write_new_ss(new_ss, connection)
smoke_session_id = get_last_smoke_session_id(connection)
thermistor_version = input("Which thermistors are you using? (old/new): ")
return smoke_session_id, thermistor_version
########################################################################################
if __name__ == "__main__":
adcs = {}
connection, login_info = get_connection()
smoke_session_id, thermistor_version = check_smoke_session(connection)
while True:
print('Start')
# temp_percent = get_temp_percent()
temp_percent = get_adc_value()
resistance = get_resistance(temp_percent = temp_percent)
temp = get_temp(R=resistance, thermistor_version=thermistor_version)
temp = temp.tolist()
temp = ['null' if (np.isnan(_)) else _ for _ in temp]
print(temp)
write_temp(temp, smoke_session_id, connection)
print('Data Written')
time.sleep(1)
print('Sleep time over')
print('')