diff --git a/examples/websockets.ru b/examples/websockets.ru index 1475584..b74f93f 100644 --- a/examples/websockets.ru +++ b/examples/websockets.ru @@ -5,9 +5,6 @@ require 'landline/extensions/websocket' class Test < Landline::App websocket "/test", version: 7 do |socket| - socket.on :message do |msg| - puts "Client wrote: #{msg}" - end socket.on :error do |err| puts "Error occured: #{err.inspect}" puts err.backtrace @@ -15,12 +12,17 @@ class Test < Landline::App socket.on :close do puts "Client closed read connection" end - socket.ready socket.write("Hi!") - response = socket.read - socket.write("You said: #{response}") + while (response = socket.read) + if response + puts "Client wrote: #{response.inspect}" + else + puts "Client closed read connection" + end + socket.write("You said: #{response}") + end socket.write("Goodbye!") - socket.close + socket.close_write rescue Exception => e puts e.inspect puts e.backtrace diff --git a/lib/landline/extensions/websocket.rb b/lib/landline/extensions/websocket.rb index 9678813..c497958 100644 --- a/lib/landline/extensions/websocket.rb +++ b/lib/landline/extensions/websocket.rb @@ -95,19 +95,6 @@ module Landline ) @readable = true @writable = true - @data = Queue.new - on :message do |msg| - @data.enq(msg) - end - end - - # Start the main loop for the eventifier - # @return [void] - def ready - return if @ready - - _loop - @ready = true end # Send data through websocket @@ -126,34 +113,30 @@ module Landline ) @io.write(frame.to_s) rescue Errno::EPIPE => e - @writable = false _emit :error, e close end # Read data from socket synchronously - # @return [String, nil] nil returned if socket closes + # @return [WebSocket::Frame::Base, nil] nil if socket received a close event def read unless @readable raise self.class::WebSocketError, "socket closed for reading" end - @data.deq + _process_events(proc { _read }) end - # Close the socket for reading - # @return [void] - def close_read - _emit :close - @readable = false - @io.close_read - end + # Read data from socket without blocking + # @return [WebSocket::Frame::Base, nil] nil if socket received a close event + def read_nonblock + unless @readable + raise self.class::WebSocketError, + "socket closed for reading" + end - # Close the socket for writing - def close_write - @writable = false - @io.close_write + _process_events(proc { _read_nonblock }) end # Establish a connection through handshake @@ -183,62 +166,78 @@ module Landline handshake end - # Close the socket - # @return [void] - def close - _close - @writable = false + # Close socket for reading + def close_read + raise WebSocketError, 'socket closed for reading' unless @readable + + _emit :close @readable = false + @io.close_read end + # Close socket for reading + def close_write + raise WebSocketError, 'socket closed for writing' unless @writable + + write(nil, type: :close) + @writable = false + @io.close_write + end + + # Close the socket entirely + def close + raise WebSocketError, 'socket closed' unless @writable or @readable + + close_read if @readable + close_write if @writable + end + + attr_reader :io, :readable, :writable + private - # Event reading loop - # @return [void] - def _loop - @thread = Thread.new do - loop do - msg = _read - if msg and [:text, :binary].include? msg.type - _emit :message, msg - elsif msg and msg.type == :close - _emit :__close, msg - break - end + # Process incoming websocket events + # @param next_frame [#call] callback to get the next frame + # @return [WebSocket::Frame::Base, nil] + def _process_events(next_frame) + loop do + frame = next_frame.call + case frame.type + when :binary, :text, :pong then return frame + when :ping + write frame.to_s, type: :pong + when :close + close_read + return nil + else raise WebSocketError, "unknown frame type #{frame.type}" end - rescue IOError => e - @writable = false - _emit :error, e - close - ensure - close_read end end # Receive data through websocket # @return [String] output from frame def _read - while (char = @io.getc) + while (char = @io.read(1)) @frame_parser << char frame = @frame_parser.next return frame if frame end rescue Errno::ECONNRESET => e - @writable = false _emit :error, e close end - # Close the websocket - # @return [void] - def _close - frame = ::WebSocket::Frame::Outgoing::Server.new( - version: @version, - type: :close - ) - @io.write(frame.to_s) if @writable - sleep 0.1 - @io.close + # Receive data through websocket asynchronously + # @return [String] output from frame + def _read_nonblock + while (char = @io.read_nonblock(1)) + @frame_parser << char + frame = @frame_parser.next + return frame if frame + end + rescue Errno::ECONNRESET => e + _emit :error, e + close end end end