forked from devyte/nodemcu-platform
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tbconnection.lua
203 lines (171 loc) · 6.47 KB
/
tbconnection.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
-- httpserver-tbconnection
-- Threaded buffered connection, meant for use with nodemcu net sockets
-- Author: Daniel Salazar
-- Based on work by Philip Gladstone, Marcos Kirsch
--[[--
--receives a buffered connection with similar interface to net sockets
function onReceive(bconn, payload)
bconn:send("blah")
bconn:send("bleh")
bconn:send("some arbitrarily long string that could even be several kbs in size, as long as heap doesn't run out")
end
srv = net.createServer(port, to)
srv:listen(80,
function(conn)
conn:on("receive",
function(conn, payload)
tbconn = dofile("threadbuffconnection.lc")(conn)
tbconn:run(onReceive, payload) --runs function in coroutine, passes args to it
end
)
end
)
tbconn:run(func, ...)
tbconn:close()
bconn:send(payload)
bconn:flush()
bconn:close()
bconn:onSent()
bconn:onReceive()
bconn:onClose()
bconn:onSent()
...
--]]--
return function(connectionArg, keepconnectionArg)
local connection = connectionArg
local keepconnection = false --default: close connection once done runnin in thread
if keepconnectionArg ~= nil then
keepconnection = keepconnectionArg
end
local connectionThread = nil
local onDisconnectionUserCallback = nil --called when connection gets dropped
local onSentUserCallback = nil --called when functionArg is done sending (i.e.: returns in coroutine). Call is done via node.task.post, so callback executes in main thread instead of coroutine.
-- clean up closure context
local function cleanup()
if connection and not keepconnection then
connection:close()
connection = nil
end
connectionThread = nil
onDisconnectionUserCallback = nil
onSentUserCallback = nil
collectgarbage()
end
local bufferedConnection = {}
function bufferedConnection:new(socketArg)
--private context
local socket = socketArg
local flushthreshold = 256 --this is how much payload gets buffered in bytes and sent as one piece to the socket
local size = 0
local data = {}
--public interface
local interface = {}
function interface:flush()
if size > 0 then
socket:send(table.concat(data, ""))
data = {}
size = 0
collectgarbage()
coroutine.yield() --allow data to be physically sent, resume from the onSent() socket callback once done
return true
end
return false
end
function interface:send(payload)
local newsize = size + payload:len()
while newsize > flushthreshold do
--STEP1: cut out piece from payload to complete threshold bytes in table
local piecesize = flushthreshold - size
local piece = payload:sub(1, piecesize)
payload = payload:sub(piecesize + 1, -1)
--STEP2: insert piece into table
table.insert(data, piece)
size = size + piecesize --size should be same as flushthreshold
--STEP3: flush entire table
self:flush()
--at this point, size should be 0, because the table was just flushed
newsize = size + payload:len()
end
--at this point, whatever is left in the table plus whatever is left in payload should be <= flushthreshold
if newsize == flushthreshold then
--case 1: what is left in table + payload is exactly flushthreshold bytes (boundary case), so flush it
table.insert(data, payload)
size = size + payload:len() --size shoulde be same as flushthreshold
self:flush()
elseif payload:len() then
--case 2: what is left in table+payload is less than flushthreshold, so just buffer payload if not empty
table.insert(data, payload)
size = size + payload:len()
--else, case 3: nothing left in payload, so do nothing
end
end
function interface:close()
self:flush()
socket:close()
cleanup()
end
return interface
end
local threadedBufferedConnection = {}
function threadedBufferedConnection:run(functionArg, ...)
connectionThread = coroutine.create(
function(functionArg, ...)
local bconn = bufferedConnection:new(connection)
functionArg(bconn, unpack(arg))
bconn:flush()
bconn = nil
collectgarbage()
if onSentUserCallback then
node.task.post(node.task.MEDIUM_PRIORITY, onSentUserCallback)
end
cleanup()
collectgarbage()
end
)
local status, err = coroutine.resume(connectionThread, functionArg, unpack(arg))
collectgarbage()
if not status then
print("Error: ", err)
end
end
function threadedBufferedConnection:close()
connection:close()
cleanup()
end
function threadedBufferedConnection:on(event, callback)
if event == "disconnection" then
onDisconnectionUserCallback = callback
elseif event == "sent" then
onSentUserCallback = callback
else
error("Error: unknown event: "..event)
end
end
local function onDisconnection(conn)
if onDisconnectionUserCallback then
onDisconnectionUserCallback(conn)
end
if connectionThread then
cleanup()
end
end
local function onSent(conn)
collectgarbage()
if connectionThread then
local connectionThreadStatus = coroutine.status(connectionThread)
if connectionThreadStatus == "suspended" then
-- Not finished with function, resume.
local status, err = coroutine.resume(connectionThread)
if not status then
print("Error: "..err)
end
elseif connectionThreadStatus == "dead" then
-- should never reach this, but play it safe
cleanup()
end
end
end
connection:on("disconnection", onDisconnection)
connection:on("sent", onSent)
return threadedBufferedConnection
end