-
Notifications
You must be signed in to change notification settings - Fork 1
/
ivy2.py
169 lines (127 loc) · 4.56 KB
/
ivy2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import time
import queue
from loguru import logger
from task import (
StartSessionTask,
GetStatusTask,
GetSettingTask,
SetSettingTask,
GetPrintReadyTask,
RebootTask
)
import image
from exceptions import (
ClientUnavailableError,
ReceiveTimeoutError,
AckError,
LowBatteryError,
CoverOpenError,
NoPaperError,
WrongSmartSheetError
)
from client import ClientThread
from utils import parse_incoming_message
PRINT_BATTERY_MIN = 30
PRINT_DATA_CHUNK = 990
class Ivy2Printer:
client = ClientThread()
def connect(self, mac_address, port=1):
self.client.connect(mac_address, port)
self.__start_session()
logger.debug("Connected")
def disconnect(self):
self.client.disconnect()
def is_connected(self):
return self.client.alive.is_set()
def print(self, target, transfer_timeout=60):
image_data = bytes()
if type(target) is str:
image_data = image.prepare_image(target)
elif type(target) is bytes:
image_data = target
else:
raise ValueError(
"Unsupported target; expected string or bytes but got {}".format(
type(target)
)
)
image_length = len(image_data)
self.check_print_worthiness()
self.get_setting()
# setup the printer to receive the image data
self.get_print_ready(image_length)
# split up the image and add to the client queue
start_index = 0
while True:
end_index = min(start_index + PRINT_DATA_CHUNK, image_length)
image_chunk = image_data[start_index:end_index]
self.client.outbound_q.put(image_chunk)
if end_index >= image_length:
break
start_index = end_index
logger.debug("Beginning data transfer...")
# wait longer than usual since the transfer takes some time
self.__receive_message(transfer_timeout)
logger.debug("Data transfer complete! Printing should begin in a moment")
def reboot(self):
return self.__perform_task(RebootTask())
def get_status(self):
return self.__perform_task(GetStatusTask())
def get_setting(self):
return self.__perform_task(GetSettingTask())
def set_setting(self, color_id, auto_power_off):
return self.__perform_task(SetSettingTask(color_id, auto_power_off))
def get_print_ready(self, length):
return self.__perform_task(GetPrintReadyTask(length))
def check_print_worthiness(self):
status = self.get_status()
error_code, battery_level, _, is_cover_open, is_no_paper, is_wrong_smart_sheet = status
if error_code != 0:
logger.error(
"Status contains a non-zero error code: {}",
error_code
)
if battery_level < PRINT_BATTERY_MIN:
raise LowBatteryError()
if is_cover_open:
raise CoverOpenError()
if is_no_paper:
raise NoPaperError()
if is_wrong_smart_sheet:
raise WrongSmartSheetError()
def __start_session(self):
return self.__perform_task(StartSessionTask())
def __perform_task(self, task):
# send the task's message
self.__send_message(task.get_message())
response = self.__receive_message()
if response[2] != task.ack:
raise AckError("Got invalid ack; expected {} but got {}".format(
task.ack, response[3]
))
# process and return the response
return task.process_response(response)
def __send_message(self, message):
if not self.client.alive.is_set():
raise ClientUnavailableError()
# add the message to the client thread's outbound queue
self.client.outbound_q.put(message)
def __receive_message(self, timeout=5):
start = int(time.time())
while int(time.time()) < (start + timeout):
if not self.client.alive.is_set():
raise ClientUnavailableError("")
try:
# attempt to read the client thread's inbound queue
response = parse_incoming_message(
self.client.inbound_q.get(False, 0.1)
)
logger.debug(
"Received message: ack: {}, error: {}",
response[2],
response[3]
)
return response
except queue.Empty:
pass
raise ReceiveTimeoutError()