This commit is contained in:
Samantaz Fox 2024-07-25 20:26:29 +00:00 committed by GitHub
commit 84a6621c63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 427 additions and 128 deletions

View File

@ -343,21 +343,6 @@ full_refresh: false
##
feed_threads: 1
##
## Enable/Disable the polling job that keeps the decryption
## function (for "secured" videos) up to date.
##
## Note: This part of the code generate a small amount of data every minute.
## This may not be desired if you have bandwidth limits set by your ISP.
##
## Note 2: This part of the code is currently broken, so changing
## this setting has no impact.
##
## Accepted values: true, false
## Default: false
##
#decrypt_polling: false
jobs:

View File

@ -163,11 +163,6 @@ if CONFIG.feed_threads > 0
Invidious::Jobs.register Invidious::Jobs::RefreshFeedsJob.new(PG_DB)
end
DECRYPT_FUNCTION = DecryptFunction.new(CONFIG.decrypt_polling)
if CONFIG.decrypt_polling
Invidious::Jobs.register Invidious::Jobs::UpdateDecryptFunctionJob.new
end
if CONFIG.statistics_enabled
Invidious::Jobs.register Invidious::Jobs::StatisticsRefreshJob.new(PG_DB, SOFTWARE)
end

View File

@ -74,8 +74,6 @@ class Config
# Database configuration using 12-Factor "Database URL" syntax
@[YAML::Field(converter: Preferences::URIConverter)]
property database_url : URI = URI.parse("")
# Use polling to keep decryption function up to date
property decrypt_polling : Bool = false
# Used for crawling channels: threads should check all videos uploaded by a channel
property full_refresh : Bool = false

View File

@ -3,9 +3,9 @@
# IPv6 addresses.
#
class TCPSocket
def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, family = Socket::Family::UNSPEC)
def initialize(host, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
super(addrinfo.family, addrinfo.type, addrinfo.protocol)
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
connect(addrinfo, timeout: connect_timeout) do |error|
close
error
@ -26,7 +26,7 @@ class HTTP::Client
end
hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, @family
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, family: @family
io.read_timeout = @read_timeout if @read_timeout
io.write_timeout = @write_timeout if @write_timeout
io.sync = false
@ -35,7 +35,7 @@ class HTTP::Client
if tls = @tls
tcp_socket = io
begin
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host)
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
rescue exc
# don't leak the TCP socket when the SSL connection failed
tcp_socket.close

View File

@ -0,0 +1,325 @@
require "uri"
require "socket"
require "socket/tcp_socket"
require "socket/unix_socket"
{% if flag?(:advanced_debug) %}
require "io/hexdump"
{% end %}
private alias NetworkEndian = IO::ByteFormat::NetworkEndian
class Invidious::SigHelper
enum UpdateStatus
Updated
UpdateNotRequired
Error
end
# -------------------
# Payload types
# -------------------
abstract struct Payload
end
struct StringPayload < Payload
getter string : String
def initialize(str : String)
raise Exception.new("SigHelper: String can't be empty") if str.empty?
@string = str
end
def self.from_bytes(slice : Bytes)
size = IO::ByteFormat::NetworkEndian.decode(UInt16, slice)
if size == 0 # Error code
raise Exception.new("SigHelper: Server encountered an error")
end
if (slice.bytesize - 2) != size
raise Exception.new("SigHelper: String size mismatch")
end
if str = String.new(slice[2..])
return self.new(str)
else
raise Exception.new("SigHelper: Can't read string from socket")
end
end
def to_io(io)
# `.to_u16` raises if there is an overflow during the conversion
io.write_bytes(@string.bytesize.to_u16, NetworkEndian)
io.write(@string.to_slice)
end
end
private enum Opcode
FORCE_UPDATE = 0
DECRYPT_N_SIGNATURE = 1
DECRYPT_SIGNATURE = 2
GET_SIGNATURE_TIMESTAMP = 3
GET_PLAYER_STATUS = 4
end
private record Request,
opcode : Opcode,
payload : Payload?
# ----------------------
# High-level functions
# ----------------------
module Client
extend self
# Forces the server to re-fetch the YouTube player, and extract the necessary
# components from it (nsig function code, sig function code, signature timestamp).
def force_update : UpdateStatus
request = Request.new(Opcode::FORCE_UPDATE, nil)
value = send_request(request) do |bytes|
IO::ByteFormat::NetworkEndian.decode(UInt16, bytes)
end
case value
when 0x0000 then return UpdateStatus::Error
when 0xFFFF then return UpdateStatus::UpdateNotRequired
when 0xF44F then return UpdateStatus::Updated
else
code = value.nil? ? "nil" : value.to_s(base: 16)
raise Exception.new("SigHelper: Invalid status code received #{code}")
end
end
# Decrypt a provided n signature using the server's current nsig function
# code, and return the result (or an error).
def decrypt_n_param(n : String) : String?
request = Request.new(Opcode::DECRYPT_N_SIGNATURE, StringPayload.new(n))
n_dec = send_request(request) do |bytes|
StringPayload.from_bytes(bytes).string
end
return n_dec
end
# Decrypt a provided s signature using the server's current sig function
# code, and return the result (or an error).
def decrypt_sig(sig : String) : String?
request = Request.new(Opcode::DECRYPT_SIGNATURE, StringPayload.new(sig))
sig_dec = send_request(request) do |bytes|
StringPayload.from_bytes(bytes).string
end
return sig_dec
end
# Return the signature timestamp from the server's current player
def get_sts : UInt64?
request = Request.new(Opcode::GET_SIGNATURE_TIMESTAMP, nil)
return send_request(request) do |bytes|
IO::ByteFormat::NetworkEndian.decode(UInt64, bytes)
end
end
# Return the current player's version
def get_player : UInt32?
request = Request.new(Opcode::GET_PLAYER_STATUS, nil)
send_request(request) do |bytes|
has_player = (bytes[0] == 0xFF)
player_version = IO::ByteFormat::NetworkEndian.decode(UInt32, bytes[1..4])
end
return has_player ? player_version : nil
end
private def send_request(request : Request, &)
channel = Multiplexor::INSTANCE.send(request)
slice = channel.receive
return yield slice
rescue ex
LOGGER.debug("SigHelper: Error when sending a request")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
end
# ---------------------
# Low level functions
# ---------------------
class Multiplexor
alias TransactionID = UInt32
record Transaction, channel = ::Channel(Bytes).new
@prng = Random.new
@mutex = Mutex.new
@queue = {} of TransactionID => Transaction
@conn : Connection
INSTANCE = new("")
def initialize(url : String)
@conn = Connection.new(url)
listen
end
def listen : Nil
raise "Socket is closed" if @conn.closed?
LOGGER.debug("SigHelper: Multiplexor listening")
# TODO: reopen socket if unexpectedly closed
spawn do
loop do
receive_data
Fiber.yield
end
end
end
def send(request : Request)
transaction = Transaction.new
transaction_id = @prng.rand(TransactionID)
# Add transaction to queue
@mutex.synchronize do
# On a 32-bits random integer, this should never happen. Though, just in case, ...
if @queue[transaction_id]?
raise Exception.new("SigHelper: Duplicate transaction ID! You got a shiny pokemon!")
end
@queue[transaction_id] = transaction
end
write_packet(transaction_id, request)
return transaction.channel
end
def receive_data
transaction_id, slice = read_packet
@mutex.synchronize do
if transaction = @queue.delete(transaction_id)
# Remove transaction from queue and send data to the channel
transaction.channel.send(slice)
LOGGER.trace("SigHelper: Transaction unqueued and data sent to channel")
else
raise Exception.new("SigHelper: Received transaction was not in queue")
end
end
end
# Read a single packet from the socket
private def read_packet : {TransactionID, Bytes}
# Header
transaction_id = @conn.read_bytes(UInt32, NetworkEndian)
length = @conn.read_bytes(UInt32, NetworkEndian)
LOGGER.trace("SigHelper: Recv transaction 0x#{transaction_id.to_s(base: 16)} / length #{length}")
if length > 67_000
raise Exception.new("SigHelper: Packet longer than expected (#{length})")
end
# Payload
slice = Bytes.new(length)
@conn.read(slice) if length > 0
LOGGER.trace("SigHelper: payload = #{slice}")
LOGGER.trace("SigHelper: Recv transaction 0x#{transaction_id.to_s(base: 16)} - Done")
return transaction_id, slice
end
# Write a single packet to the socket
private def write_packet(transaction_id : TransactionID, request : Request)
LOGGER.trace("SigHelper: Send transaction 0x#{transaction_id.to_s(base: 16)} / opcode #{request.opcode}")
io = IO::Memory.new(1024)
io.write_bytes(request.opcode.to_u8, NetworkEndian)
io.write_bytes(transaction_id, NetworkEndian)
if payload = request.payload
payload.to_io(io)
end
@conn.send(io)
@conn.flush
LOGGER.trace("SigHelper: Send transaction 0x#{transaction_id.to_s(base: 16)} - Done")
end
end
class Connection
@socket : UNIXSocket | TCPSocket
{% if flag?(:advanced_debug) %}
@io : IO::Hexdump
{% end %}
def initialize(host_or_path : String)
if host_or_path.empty?
host_or_path = "/tmp/inv_sig_helper.sock"
end
case host_or_path
when .starts_with?('/')
@socket = UNIXSocket.new(host_or_path)
when .starts_with?("tcp://")
uri = URI.parse(host_or_path)
@socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!)
else
uri = URI.parse("tcp://#{host_or_path}")
@socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!)
end
LOGGER.debug("SigHelper: Listening on '#{host_or_path}'")
{% if flag?(:advanced_debug) %}
@io = IO::Hexdump.new(@socket, output: STDERR, read: true, write: true)
{% end %}
@socket.sync = false
@socket.blocking = false
end
def closed? : Bool
return @socket.closed?
end
def close : Nil
if @socket.closed?
raise Exception.new("SigHelper: Can't close socket, it's already closed")
else
@socket.close
end
end
def flush(*args, **options)
@socket.flush(*args, **options)
end
def send(*args, **options)
@socket.send(*args, **options)
end
# Wrap IO functions, with added debug tooling if needed
{% for function in %w(read read_bytes write write_bytes) %}
def {{function.id}}(*args, **options)
{% if flag?(:advanced_debug) %}
@io.{{function.id}}(*args, **options)
{% else %}
@socket.{{function.id}}(*args, **options)
{% end %}
end
{% end %}
end
end

View File

@ -1,73 +1,46 @@
alias SigProc = Proc(Array(String), Int32, Array(String))
require "http/params"
require "./sig_helper"
struct DecryptFunction
@decrypt_function = [] of {SigProc, Int32}
@decrypt_time = Time.monotonic
struct Invidious::DecryptFunction
@last_update = Time.monotonic - 42.days
def initialize(@use_polling = true)
def initialize
self.check_update
end
def update_decrypt_function
@decrypt_function = fetch_decrypt_function
def check_update
now = Time.monotonic
if (now - @last_update) > 60.seconds
LOGGER.debug("Signature: Player might be outdated, updating")
Invidious::SigHelper::Client.force_update
@last_update = Time.monotonic
end
end
private def fetch_decrypt_function(id = "CvFH_6DNRCY")
document = YT_POOL.client &.get("/watch?v=#{id}&gl=US&hl=en").body
url = document.match(/src="(?<url>\/s\/player\/[^\/]+\/player_ias[^\/]+\/en_US\/base.js)"/).not_nil!["url"]
player = YT_POOL.client &.get(url).body
function_name = player.match(/^(?<name>[^=]+)=function\(\w\){\w=\w\.split\(""\);[^\. ]+\.[^( ]+/m).not_nil!["name"]
function_body = player.match(/^#{Regex.escape(function_name)}=function\(\w\){(?<body>[^}]+)}/m).not_nil!["body"]
function_body = function_body.split(";")[1..-2]
var_name = function_body[0][0, 2]
var_body = player.delete("\n").match(/var #{Regex.escape(var_name)}={(?<body>(.*?))};/).not_nil!["body"]
operations = {} of String => SigProc
var_body.split("},").each do |operation|
op_name = operation.match(/^[^:]+/).not_nil![0]
op_body = operation.match(/\{[^}]+/).not_nil![0]
case op_body
when "{a.reverse()"
operations[op_name] = ->(a : Array(String), _b : Int32) { a.reverse }
when "{a.splice(0,b)"
operations[op_name] = ->(a : Array(String), b : Int32) { a.delete_at(0..(b - 1)); a }
else
operations[op_name] = ->(a : Array(String), b : Int32) { c = a[0]; a[0] = a[b % a.size]; a[b % a.size] = c; a }
end
end
decrypt_function = [] of {SigProc, Int32}
function_body.each do |function|
function = function.lchop(var_name).delete("[].")
op_name = function.match(/[^\(]+/).not_nil![0]
value = function.match(/\(\w,(?<value>[\d]+)\)/).not_nil!["value"].to_i
decrypt_function << {operations[op_name], value}
end
return decrypt_function
def decrypt_nsig(n : String) : String?
self.check_update
return SigHelper::Client.decrypt_n_param(n)
rescue ex
LOGGER.debug(ex.message || "Signature: Unknown error")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
def decrypt_signature(fmt : Hash(String, JSON::Any))
return "" if !fmt["s"]? || !fmt["sp"]?
def decrypt_signature(str : String) : String?
self.check_update
return SigHelper::Client.decrypt_sig(str)
rescue ex
LOGGER.debug(ex.message || "Signature: Unknown error")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
sp = fmt["sp"].as_s
sig = fmt["s"].as_s.split("")
if !@use_polling
now = Time.monotonic
if now - @decrypt_time > 60.seconds || @decrypt_function.size == 0
@decrypt_function = fetch_decrypt_function
@decrypt_time = Time.monotonic
end
end
@decrypt_function.each do |proc, value|
sig = proc.call(sig, value)
end
return "&#{sp}=#{sig.join("")}"
def get_sts : UInt64?
self.check_update
return SigHelper::Client.get_sts
rescue ex
LOGGER.debug(ex.message || "Signature: Unknown error")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
end

View File

@ -1,14 +0,0 @@
class Invidious::Jobs::UpdateDecryptFunctionJob < Invidious::Jobs::BaseJob
def begin
loop do
begin
DECRYPT_FUNCTION.update_decrypt_function
rescue ex
LOGGER.error("UpdateDecryptFunctionJob : #{ex.message}")
ensure
sleep 1.minute
Fiber.yield
end
end
end
end

View File

@ -1,3 +1,5 @@
private DECRYPT_FUNCTION = IV::DecryptFunction.new
enum VideoType
Video
Livestream
@ -98,20 +100,47 @@ struct Video
# Methods for parsing streaming data
def convert_url(fmt)
if cfr = fmt["signatureCipher"]?.try { |h| HTTP::Params.parse(h.as_s) }
sp = cfr["sp"]
url = URI.parse(cfr["url"])
params = url.query_params
LOGGER.debug("Videos: Decoding '#{cfr}'")
unsig = DECRYPT_FUNCTION.decrypt_signature(cfr["s"])
params[sp] = unsig if unsig
else
url = URI.parse(fmt["url"].as_s)
params = url.query_params
end
n = DECRYPT_FUNCTION.decrypt_nsig(params["n"])
params["n"] = n if n
params["host"] = url.host.not_nil!
if region = self.info["region"]?.try &.as_s
params["region"] = region
end
url.query_params = params
LOGGER.trace("Videos: new url is '#{url}'")
return url.to_s
rescue ex
LOGGER.debug("Videos: Error when parsing video URL")
LOGGER.trace(ex.inspect_with_backtrace)
return ""
end
def fmt_stream
return @fmt_stream.as(Array(Hash(String, JSON::Any))) if @fmt_stream
fmt_stream = info["streamingData"]?.try &.["formats"]?.try &.as_a.map &.as_h || [] of Hash(String, JSON::Any)
fmt_stream.each do |fmt|
if s = (fmt["cipher"]? || fmt["signatureCipher"]?).try { |h| HTTP::Params.parse(h.as_s) }
s.each do |k, v|
fmt[k] = JSON::Any.new(v)
end
fmt["url"] = JSON::Any.new("#{fmt["url"]}#{DECRYPT_FUNCTION.decrypt_signature(fmt)}")
end
fmt_stream = info.dig?("streamingData", "formats")
.try &.as_a.map &.as_h || [] of Hash(String, JSON::Any)
fmt["url"] = JSON::Any.new("#{fmt["url"]}&host=#{URI.parse(fmt["url"].as_s).host}")
fmt["url"] = JSON::Any.new("#{fmt["url"]}&region=#{self.info["region"]}") if self.info["region"]?
fmt_stream.each do |fmt|
fmt["url"] = JSON::Any.new(self.convert_url(fmt))
end
fmt_stream.sort_by! { |f| f["width"]?.try &.as_i || 0 }
@ -121,21 +150,17 @@ struct Video
def adaptive_fmts
return @adaptive_fmts.as(Array(Hash(String, JSON::Any))) if @adaptive_fmts
fmt_stream = info["streamingData"]?.try &.["adaptiveFormats"]?.try &.as_a.map &.as_h || [] of Hash(String, JSON::Any)
fmt_stream.each do |fmt|
if s = (fmt["cipher"]? || fmt["signatureCipher"]?).try { |h| HTTP::Params.parse(h.as_s) }
s.each do |k, v|
fmt[k] = JSON::Any.new(v)
end
fmt["url"] = JSON::Any.new("#{fmt["url"]}#{DECRYPT_FUNCTION.decrypt_signature(fmt)}")
end
fmt["url"] = JSON::Any.new("#{fmt["url"]}&host=#{URI.parse(fmt["url"].as_s).host}")
fmt["url"] = JSON::Any.new("#{fmt["url"]}&region=#{self.info["region"]}") if self.info["region"]?
fmt_stream = info.dig("streamingData", "adaptiveFormats")
.try &.as_a.map &.as_h || [] of Hash(String, JSON::Any)
fmt_stream.each do |fmt|
fmt["url"] = JSON::Any.new(self.convert_url(fmt))
end
fmt_stream.sort_by! { |f| f["width"]?.try &.as_i || 0 }
@adaptive_fmts = fmt_stream
return @adaptive_fmts.as(Array(Hash(String, JSON::Any)))
end

View File

@ -2,6 +2,8 @@
# This file contains youtube API wrappers
#
private STS_FETCHER = IV::DecryptFunction.new
module YoutubeAPI
extend self
@ -272,7 +274,7 @@ module YoutubeAPI
# Return, as a Hash, the "context" data required to request the
# youtube API endpoints.
#
private def make_context(client_config : ClientConfig | Nil) : Hash
private def make_context(client_config : ClientConfig | Nil, video_id = "dQw4w9WgXcQ") : Hash
# Use the default client config if nil is passed
client_config ||= DEFAULT_CLIENT_CONFIG
@ -292,7 +294,7 @@ module YoutubeAPI
if client_config.screen == "EMBED"
client_context["thirdParty"] = {
"embedUrl" => "https://www.youtube.com/embed/dQw4w9WgXcQ",
"embedUrl" => "https://www.youtube.com/embed/#{video_id}",
} of String => String | Int64
end
@ -453,19 +455,29 @@ module YoutubeAPI
params : String,
client_config : ClientConfig | Nil = nil
)
# Playback context, separate because it can be different between clients
playback_ctx = {
"html5Preference" => "HTML5_PREF_WANTS",
"referer" => "https://www.youtube.com/watch?v=#{video_id}",
} of String => String | Int64
if {"WEB", "TVHTML5"}.any? { |s| client_config.name.starts_with? s }
if sts = STS_FETCHER.get_sts
playback_ctx["signatureTimestamp"] = sts.to_i64
end
end
# JSON Request data, required by the API
data = {
"contentCheckOk" => true,
"videoId" => video_id,
"context" => self.make_context(client_config),
"context" => self.make_context(client_config, video_id),
"racyCheckOk" => true,
"user" => {
"lockedSafetyMode" => false,
},
"playbackContext" => {
"contentPlaybackContext" => {
"html5Preference": "HTML5_PREF_WANTS",
},
"contentPlaybackContext" => playback_ctx,
},
}