Yessiest
2 weeks ago
6 changed files with 323 additions and 10 deletions
-
2README.md
-
2examples/session.ru
-
32examples/websockets.ru
-
286lib/landline/extensions/websocket.rb
-
8lib/landline/path.rb
-
3lib/landline/probe/handler.rb
@ -0,0 +1,32 @@ |
|||
# frozen_string_literal: true |
|||
|
|||
require 'landline' |
|||
require 'landline/extensions/websocket' |
|||
|
|||
class Test < Landline::App |
|||
setup do |
|||
websocket "/test", version: 7 do |socket| |
|||
socket.on :message do |msg| |
|||
puts "Client wrote: #{msg}" |
|||
end |
|||
socket.on :error do |err| |
|||
puts "Error occured: #{err.inspect}" |
|||
puts err.backtrace |
|||
end |
|||
socket.on :close do |
|||
puts "Client closed read connection" |
|||
end |
|||
socket.ready |
|||
socket.write("Hi!") |
|||
response = socket.read |
|||
socket.write("You said: #{response}") |
|||
socket.write("Goodbye!") |
|||
socket.close |
|||
rescue Exception => e |
|||
puts e.inspect |
|||
puts e.backtrace |
|||
end |
|||
end |
|||
end |
|||
|
|||
run Test.new |
@ -0,0 +1,286 @@ |
|||
# frozen_string_literal: true |
|||
|
|||
require 'websocket' |
|||
module Landline |
|||
# Module that holds websocket primitives |
|||
module WebSocket |
|||
# Event system |
|||
module Eventifier |
|||
# Attach event listener |
|||
# @param event [Symbol] |
|||
# @param listener [#call] |
|||
def on(event, &listener) |
|||
@__listeners ||= {} |
|||
@__listeners[event] ||= [] |
|||
@__listeners[event].append(listener) |
|||
listener |
|||
end |
|||
|
|||
# Attach event listener |
|||
# @param event [Symbol] |
|||
# @param listener [#call] |
|||
def off(event, listener) |
|||
@__listeners ||= {} |
|||
@__listeners[event]&.delete(listener) |
|||
end |
|||
|
|||
# Await for an event |
|||
# @param event [Symbol, Array<Symbol>] event or array of events to wait for |
|||
# @return [Array] |
|||
# @sg-ignore |
|||
def await(event) |
|||
blocking = true |
|||
output = nil |
|||
listener = proc do |*data| |
|||
output = data |
|||
blocking = false |
|||
end |
|||
if event.is_a? Array |
|||
event.each { |x| on(x, &listener) } |
|||
else |
|||
on(event, &listener) |
|||
end |
|||
while blocking; end |
|||
return output[0] if output.is_a? Array and output.length == 1 |
|||
|
|||
output |
|||
end |
|||
|
|||
private |
|||
|
|||
# Trigger the queue clearing process |
|||
# @return [void] |
|||
def _process |
|||
return if @processing |
|||
|
|||
@__processing = true |
|||
@__queue ||= [] |
|||
@__listeners ||= {} |
|||
until @__queue.empty? |
|||
event, msg = @__queue.shift |
|||
if @__listeners.include? event |
|||
@__listeners[event].each { |x| x.call(*msg) } |
|||
end |
|||
end |
|||
@processing = false |
|||
end |
|||
|
|||
# Send internal event |
|||
# @param event [Symbol] |
|||
# @param data [Array] |
|||
# @return [void] |
|||
def _emit(event, *data) |
|||
return unless @__listeners |
|||
|
|||
return unless @__listeners.include? event |
|||
|
|||
@__queue ||= [] |
|||
@__queue.push([event, data]) |
|||
_process |
|||
end |
|||
end |
|||
|
|||
# Socket-like object representing websocket interface |
|||
class WSockWrapper |
|||
include Eventifier |
|||
|
|||
class WebSocketError < StandardError |
|||
end |
|||
|
|||
def initialize(io, version: 7) |
|||
@io = io |
|||
@version = version |
|||
@frame_parser = ::WebSocket::Frame::Incoming::Server.new( |
|||
version: version |
|||
) |
|||
@readable = true |
|||
@writable = true |
|||
@data = Queue.new |
|||
on :message do |msg| |
|||
@data.enq(msg) |
|||
end |
|||
end |
|||
|
|||
# Start the main loop for the eventifier |
|||
# @return [void] |
|||
def ready |
|||
return if @ready |
|||
|
|||
_loop |
|||
@ready = true |
|||
end |
|||
|
|||
# Send data through websocket |
|||
# @param data [String] binary data |
|||
# @return [void] |
|||
def write(data, type: :text) |
|||
unless @writable |
|||
raise self.class::WebSocketError, |
|||
"socket closed for writing" |
|||
end |
|||
|
|||
frame = ::WebSocket::Frame::Outgoing::Server.new( |
|||
version: @version, |
|||
data: data, |
|||
type: type |
|||
) |
|||
@io.write(frame.to_s) |
|||
end |
|||
|
|||
# Read data from socket synchronously |
|||
# @return [String, nil] nil returned if socket closes |
|||
def read |
|||
unless @readable |
|||
raise self.class::WebSocketError, |
|||
"socket closed for reading" |
|||
end |
|||
|
|||
@data.deq |
|||
end |
|||
|
|||
# Close the socket for reading |
|||
# @return [void] |
|||
def close_read |
|||
_emit :close |
|||
@readable = false |
|||
@io.close_read |
|||
end |
|||
|
|||
# Close the socket for writing |
|||
def close_write |
|||
@writable = false |
|||
@io.close_write |
|||
end |
|||
|
|||
# Establish a connection through handshake |
|||
# @return [self] |
|||
def self.handshake(request, version: 7, **opts) |
|||
raise StandardError, "stream cannot be hijacked" unless request.hijack |
|||
|
|||
handshake = create_handshake(request, version: version, **opts) |
|||
return nil unless handshake |
|||
|
|||
io = request.hijack.call |
|||
io.sendmsg(handshake.to_s) |
|||
new(io, version: version) |
|||
end |
|||
|
|||
# Initiate a handshake |
|||
def self.create_handshake(request, **opts) |
|||
handshake = ::WebSocket::Handshake::Server.new(**opts) |
|||
handshake.from_hash({ |
|||
headers: request.headers, |
|||
path: request.path_info, |
|||
query: request.query.query, |
|||
body: request.body |
|||
}) |
|||
return nil unless handshake.finished? and handshake.valid? |
|||
|
|||
handshake |
|||
end |
|||
|
|||
# Close the socket |
|||
# @return [void] |
|||
def close |
|||
_close |
|||
@writable = false |
|||
@readable = false |
|||
end |
|||
|
|||
private |
|||
|
|||
# Event reading loop |
|||
# @return [void] |
|||
def _loop |
|||
@thread = Thread.new do |
|||
loop do |
|||
msg = _read |
|||
if msg and [:text, :binary].include? msg.type |
|||
_emit :message, msg |
|||
elsif msg and msg.type == :close |
|||
_emit :__close, msg |
|||
break |
|||
end |
|||
end |
|||
rescue IOError => e |
|||
@writable = false |
|||
_emit :error, e |
|||
close |
|||
ensure |
|||
close_read |
|||
end |
|||
end |
|||
|
|||
# Receive data through websocket |
|||
# @return [String] output from frame |
|||
def _read |
|||
while (char = @io.getc) |
|||
@frame_parser << char |
|||
frame = @frame_parser.next |
|||
return frame if frame |
|||
end |
|||
end |
|||
|
|||
# Close the websocket |
|||
# @return [void] |
|||
def _close |
|||
frame = ::WebSocket::Frame::Outgoing::Server.new( |
|||
version: @version, |
|||
type: :close |
|||
) |
|||
@io.write(frame.to_s) if @writable |
|||
sleep 0.1 |
|||
@io.close |
|||
end |
|||
end |
|||
end |
|||
end |
|||
|
|||
module Landline |
|||
module Handlers |
|||
# Web socket server handler |
|||
class WebSockServer < Landline::Probe |
|||
# @param path [Object] |
|||
# @param parent [Landline::Node] |
|||
# @param params [Hash] options hash |
|||
# @param callback [#call] callback to process request within |
|||
# @option params [Integer] :version protocol version |
|||
# @option params [Array<String>] :protocols array of supported sub-protocols |
|||
# @option params [Boolean] :secure true if the server will use wss:// protocol |
|||
def initialize(path, parent:, **params, &callback) |
|||
nodeparams = params.dup |
|||
nodeparams.delete(:version) |
|||
nodeparams.delete(:protocols) |
|||
nodeparams.delete(:secure) |
|||
super(path, parent: parent, **nodeparams) |
|||
@callback = callback |
|||
@params = params |
|||
end |
|||
|
|||
# Method callback on successful request navigation |
|||
# Creates a websocket from a given request |
|||
# @param request [Landline::Request] |
|||
def process(request) |
|||
@callback.call( |
|||
Landline::WebSocket::WSockWrapper.handshake( |
|||
request, |
|||
**@params |
|||
) |
|||
) |
|||
end |
|||
end |
|||
end |
|||
|
|||
module DSL |
|||
module PathConstructors |
|||
# (in Landline::Path context) |
|||
# Create a new websocket handler |
|||
def websocket(path, **args, &setup) |
|||
register(Landline::Handlers::WebSockServer.new(path, |
|||
parent: @origin, |
|||
**args, |
|||
&setup)) |
|||
end |
|||
end |
|||
end |
|||
end |
Write
Preview
Loading…
Cancel
Save
Reference in new issue