From 005edca228d6c78b306d2d27d9a960b813f9f6c2 Mon Sep 17 00:00:00 2001 From: Yessiest Date: Sun, 28 Apr 2024 18:39:42 +0400 Subject: [PATCH] Websockets (experimental) --- README.md | 2 +- examples/session.ru | 2 +- examples/websockets.ru | 32 +++ lib/landline/extensions/websocket.rb | 286 +++++++++++++++++++++++++++ lib/landline/path.rb | 8 +- lib/landline/probe/handler.rb | 3 - 6 files changed, 323 insertions(+), 10 deletions(-) create mode 100644 examples/websockets.ru create mode 100644 lib/landline/extensions/websocket.rb diff --git a/README.md b/README.md index a00f600..bfab5c1 100644 --- a/README.md +++ b/README.md @@ -176,7 +176,7 @@ that some of that functionality may require additional dependencies, which would otherwise be optional. This functionality includes: - PHP-like Session handling (via `landline/extensions/session`) -- Websockets (via `landline/extensions/websockets`) (coming soon) +- Websockets (via `landline/extensions/websockets`) (available for testing) - (Probably something else eventually) Landline is built entirely on Rack webserver interface, while being agnostic diff --git a/examples/session.ru b/examples/session.ru index 00fef29..2908b9d 100644 --- a/examples/session.ru +++ b/examples/session.ru @@ -3,7 +3,7 @@ require 'landline/extensions/session' require 'landline' -Landline::Session.hmac_secret = "Your secure signing secret here" +#Landline::Session.hmac_secret = "Your secure signing secret here" app = Landline::Server.new do get "/make_cookie" do diff --git a/examples/websockets.ru b/examples/websockets.ru new file mode 100644 index 0000000..012cd84 --- /dev/null +++ b/examples/websockets.ru @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +require 'landline' +require 'landline/extensions/websocket' + +class Test < Landline::App + setup do + websocket "/test", version: 7 do |socket| + socket.on :message do |msg| + puts "Client wrote: #{msg}" + end + socket.on :error do |err| + puts "Error occured: #{err.inspect}" + puts err.backtrace + end + socket.on :close do + puts "Client closed read connection" + end + socket.ready + socket.write("Hi!") + response = socket.read + socket.write("You said: #{response}") + socket.write("Goodbye!") + socket.close + rescue Exception => e + puts e.inspect + puts e.backtrace + end + end +end + +run Test.new diff --git a/lib/landline/extensions/websocket.rb b/lib/landline/extensions/websocket.rb new file mode 100644 index 0000000..d1bfc70 --- /dev/null +++ b/lib/landline/extensions/websocket.rb @@ -0,0 +1,286 @@ +# frozen_string_literal: true + +require 'websocket' +module Landline + # Module that holds websocket primitives + module WebSocket + # Event system + module Eventifier + # Attach event listener + # @param event [Symbol] + # @param listener [#call] + def on(event, &listener) + @__listeners ||= {} + @__listeners[event] ||= [] + @__listeners[event].append(listener) + listener + end + + # Attach event listener + # @param event [Symbol] + # @param listener [#call] + def off(event, listener) + @__listeners ||= {} + @__listeners[event]&.delete(listener) + end + + # Await for an event + # @param event [Symbol, Array] event or array of events to wait for + # @return [Array] + # @sg-ignore + def await(event) + blocking = true + output = nil + listener = proc do |*data| + output = data + blocking = false + end + if event.is_a? Array + event.each { |x| on(x, &listener) } + else + on(event, &listener) + end + while blocking; end + return output[0] if output.is_a? Array and output.length == 1 + + output + end + + private + + # Trigger the queue clearing process + # @return [void] + def _process + return if @processing + + @__processing = true + @__queue ||= [] + @__listeners ||= {} + until @__queue.empty? + event, msg = @__queue.shift + if @__listeners.include? event + @__listeners[event].each { |x| x.call(*msg) } + end + end + @processing = false + end + + # Send internal event + # @param event [Symbol] + # @param data [Array] + # @return [void] + def _emit(event, *data) + return unless @__listeners + + return unless @__listeners.include? event + + @__queue ||= [] + @__queue.push([event, data]) + _process + end + end + + # Socket-like object representing websocket interface + class WSockWrapper + include Eventifier + + class WebSocketError < StandardError + end + + def initialize(io, version: 7) + @io = io + @version = version + @frame_parser = ::WebSocket::Frame::Incoming::Server.new( + version: version + ) + @readable = true + @writable = true + @data = Queue.new + on :message do |msg| + @data.enq(msg) + end + end + + # Start the main loop for the eventifier + # @return [void] + def ready + return if @ready + + _loop + @ready = true + end + + # Send data through websocket + # @param data [String] binary data + # @return [void] + def write(data, type: :text) + unless @writable + raise self.class::WebSocketError, + "socket closed for writing" + end + + frame = ::WebSocket::Frame::Outgoing::Server.new( + version: @version, + data: data, + type: type + ) + @io.write(frame.to_s) + end + + # Read data from socket synchronously + # @return [String, nil] nil returned if socket closes + def read + unless @readable + raise self.class::WebSocketError, + "socket closed for reading" + end + + @data.deq + end + + # Close the socket for reading + # @return [void] + def close_read + _emit :close + @readable = false + @io.close_read + end + + # Close the socket for writing + def close_write + @writable = false + @io.close_write + end + + # Establish a connection through handshake + # @return [self] + def self.handshake(request, version: 7, **opts) + raise StandardError, "stream cannot be hijacked" unless request.hijack + + handshake = create_handshake(request, version: version, **opts) + return nil unless handshake + + io = request.hijack.call + io.sendmsg(handshake.to_s) + new(io, version: version) + end + + # Initiate a handshake + def self.create_handshake(request, **opts) + handshake = ::WebSocket::Handshake::Server.new(**opts) + handshake.from_hash({ + headers: request.headers, + path: request.path_info, + query: request.query.query, + body: request.body + }) + return nil unless handshake.finished? and handshake.valid? + + handshake + end + + # Close the socket + # @return [void] + def close + _close + @writable = false + @readable = false + end + + private + + # Event reading loop + # @return [void] + def _loop + @thread = Thread.new do + loop do + msg = _read + if msg and [:text, :binary].include? msg.type + _emit :message, msg + elsif msg and msg.type == :close + _emit :__close, msg + break + end + end + rescue IOError => e + @writable = false + _emit :error, e + close + ensure + close_read + end + end + + # Receive data through websocket + # @return [String] output from frame + def _read + while (char = @io.getc) + @frame_parser << char + frame = @frame_parser.next + return frame if frame + end + end + + # Close the websocket + # @return [void] + def _close + frame = ::WebSocket::Frame::Outgoing::Server.new( + version: @version, + type: :close + ) + @io.write(frame.to_s) if @writable + sleep 0.1 + @io.close + end + end + end +end + +module Landline + module Handlers + # Web socket server handler + class WebSockServer < Landline::Probe + # @param path [Object] + # @param parent [Landline::Node] + # @param params [Hash] options hash + # @param callback [#call] callback to process request within + # @option params [Integer] :version protocol version + # @option params [Array] :protocols array of supported sub-protocols + # @option params [Boolean] :secure true if the server will use wss:// protocol + def initialize(path, parent:, **params, &callback) + nodeparams = params.dup + nodeparams.delete(:version) + nodeparams.delete(:protocols) + nodeparams.delete(:secure) + super(path, parent: parent, **nodeparams) + @callback = callback + @params = params + end + + # Method callback on successful request navigation + # Creates a websocket from a given request + # @param request [Landline::Request] + def process(request) + @callback.call( + Landline::WebSocket::WSockWrapper.handshake( + request, + **@params + ) + ) + end + end + end + + module DSL + module PathConstructors + # (in Landline::Path context) + # Create a new websocket handler + def websocket(path, **args, &setup) + register(Landline::Handlers::WebSockServer.new(path, + parent: @origin, + **args, + &setup)) + end + end + end +end diff --git a/lib/landline/path.rb b/lib/landline/path.rb index 52db57f..8176ff7 100644 --- a/lib/landline/path.rb +++ b/lib/landline/path.rb @@ -100,7 +100,7 @@ module Landline @filters.append(block) end - attr_reader :children, :properties, :request + attr_reader :children, :properties attr_accessor :bounce, :pipeline @@ -157,9 +157,7 @@ module Landline @bounce ? exit_stack(request) : _die(404) rescue StandardError => e - _die(500, backtrace: [e.to_s] + e.backtrace) - ensure - @request = nil + _die(request, 500, backtrace: [e.to_s] + e.backtrace) end # Run enqueued postprocessors on navigation failure @@ -190,7 +188,7 @@ module Landline # @param errorcode [Integer] # @param backtrace [Array(String), nil] # @raise [UncaughtThrowError] throws :finish to stop processing - def _die(errorcode, backtrace: nil) + def _die(request, errorcode, backtrace: nil) proccontext = get_context(request) throw :finish, [errorcode].append( *proccontext.instance_exec( diff --git a/lib/landline/probe/handler.rb b/lib/landline/probe/handler.rb index f6e1eb6..e090d85 100644 --- a/lib/landline/probe/handler.rb +++ b/lib/landline/probe/handler.rb @@ -14,9 +14,6 @@ module Landline @callback = exec end - attr_accessor :response - attr_reader :request - # Method callback on successful request navigation. # Runs block supplied with object initialization. # Request's #splat and #param are passed to block.