synchronous sockets, no more threads
This commit is contained in:
parent
20c312d9a9
commit
6566c2af5c
|
@ -5,9 +5,6 @@ require 'landline/extensions/websocket'
|
|||
|
||||
class Test < Landline::App
|
||||
websocket "/test", version: 7 do |socket|
|
||||
socket.on :message do |msg|
|
||||
puts "Client wrote: #{msg}"
|
||||
end
|
||||
socket.on :error do |err|
|
||||
puts "Error occured: #{err.inspect}"
|
||||
puts err.backtrace
|
||||
|
@ -15,12 +12,17 @@ class Test < Landline::App
|
|||
socket.on :close do
|
||||
puts "Client closed read connection"
|
||||
end
|
||||
socket.ready
|
||||
socket.write("Hi!")
|
||||
response = socket.read
|
||||
while (response = socket.read)
|
||||
if response
|
||||
puts "Client wrote: #{response.inspect}"
|
||||
else
|
||||
puts "Client closed read connection"
|
||||
end
|
||||
socket.write("You said: #{response}")
|
||||
end
|
||||
socket.write("Goodbye!")
|
||||
socket.close
|
||||
socket.close_write
|
||||
rescue Exception => e
|
||||
puts e.inspect
|
||||
puts e.backtrace
|
||||
|
|
|
@ -95,19 +95,6 @@ module Landline
|
|||
)
|
||||
@readable = true
|
||||
@writable = true
|
||||
@data = Queue.new
|
||||
on :message do |msg|
|
||||
@data.enq(msg)
|
||||
end
|
||||
end
|
||||
|
||||
# Start the main loop for the eventifier
|
||||
# @return [void]
|
||||
def ready
|
||||
return if @ready
|
||||
|
||||
_loop
|
||||
@ready = true
|
||||
end
|
||||
|
||||
# Send data through websocket
|
||||
|
@ -126,34 +113,30 @@ module Landline
|
|||
)
|
||||
@io.write(frame.to_s)
|
||||
rescue Errno::EPIPE => e
|
||||
@writable = false
|
||||
_emit :error, e
|
||||
close
|
||||
end
|
||||
|
||||
# Read data from socket synchronously
|
||||
# @return [String, nil] nil returned if socket closes
|
||||
# @return [WebSocket::Frame::Base, nil] nil if socket received a close event
|
||||
def read
|
||||
unless @readable
|
||||
raise self.class::WebSocketError,
|
||||
"socket closed for reading"
|
||||
end
|
||||
|
||||
@data.deq
|
||||
_process_events(proc { _read })
|
||||
end
|
||||
|
||||
# Close the socket for reading
|
||||
# @return [void]
|
||||
def close_read
|
||||
_emit :close
|
||||
@readable = false
|
||||
@io.close_read
|
||||
# Read data from socket without blocking
|
||||
# @return [WebSocket::Frame::Base, nil] nil if socket received a close event
|
||||
def read_nonblock
|
||||
unless @readable
|
||||
raise self.class::WebSocketError,
|
||||
"socket closed for reading"
|
||||
end
|
||||
|
||||
# Close the socket for writing
|
||||
def close_write
|
||||
@writable = false
|
||||
@io.close_write
|
||||
_process_events(proc { _read_nonblock })
|
||||
end
|
||||
|
||||
# Establish a connection through handshake
|
||||
|
@ -183,62 +166,78 @@ module Landline
|
|||
handshake
|
||||
end
|
||||
|
||||
# Close the socket
|
||||
# @return [void]
|
||||
def close
|
||||
_close
|
||||
@writable = false
|
||||
# Close socket for reading
|
||||
def close_read
|
||||
raise WebSocketError, 'socket closed for reading' unless @readable
|
||||
|
||||
_emit :close
|
||||
@readable = false
|
||||
@io.close_read
|
||||
end
|
||||
|
||||
# Close socket for reading
|
||||
def close_write
|
||||
raise WebSocketError, 'socket closed for writing' unless @writable
|
||||
|
||||
write(nil, type: :close)
|
||||
@writable = false
|
||||
@io.close_write
|
||||
end
|
||||
|
||||
# Close the socket entirely
|
||||
def close
|
||||
raise WebSocketError, 'socket closed' unless @writable or @readable
|
||||
|
||||
close_read if @readable
|
||||
close_write if @writable
|
||||
end
|
||||
|
||||
attr_reader :io, :readable, :writable
|
||||
|
||||
private
|
||||
|
||||
# Event reading loop
|
||||
# @return [void]
|
||||
def _loop
|
||||
@thread = Thread.new do
|
||||
# Process incoming websocket events
|
||||
# @param next_frame [#call] callback to get the next frame
|
||||
# @return [WebSocket::Frame::Base, nil]
|
||||
def _process_events(next_frame)
|
||||
loop do
|
||||
msg = _read
|
||||
if msg and [:text, :binary].include? msg.type
|
||||
_emit :message, msg
|
||||
elsif msg and msg.type == :close
|
||||
_emit :__close, msg
|
||||
break
|
||||
end
|
||||
end
|
||||
rescue IOError => e
|
||||
@writable = false
|
||||
_emit :error, e
|
||||
close
|
||||
ensure
|
||||
frame = next_frame.call
|
||||
case frame.type
|
||||
when :binary, :text, :pong then return frame
|
||||
when :ping
|
||||
write frame.to_s, type: :pong
|
||||
when :close
|
||||
close_read
|
||||
return nil
|
||||
else raise WebSocketError, "unknown frame type #{frame.type}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Receive data through websocket
|
||||
# @return [String] output from frame
|
||||
def _read
|
||||
while (char = @io.getc)
|
||||
while (char = @io.read(1))
|
||||
@frame_parser << char
|
||||
frame = @frame_parser.next
|
||||
return frame if frame
|
||||
end
|
||||
rescue Errno::ECONNRESET => e
|
||||
@writable = false
|
||||
_emit :error, e
|
||||
close
|
||||
end
|
||||
|
||||
# Close the websocket
|
||||
# @return [void]
|
||||
def _close
|
||||
frame = ::WebSocket::Frame::Outgoing::Server.new(
|
||||
version: @version,
|
||||
type: :close
|
||||
)
|
||||
@io.write(frame.to_s) if @writable
|
||||
sleep 0.1
|
||||
@io.close
|
||||
# Receive data through websocket asynchronously
|
||||
# @return [String] output from frame
|
||||
def _read_nonblock
|
||||
while (char = @io.read_nonblock(1))
|
||||
@frame_parser << char
|
||||
frame = @frame_parser.next
|
||||
return frame if frame
|
||||
end
|
||||
rescue Errno::ECONNRESET => e
|
||||
_emit :error, e
|
||||
close
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue