3 # EventMachine adapter is useful for either asynchronous requests
4 # when in EM reactor loop or for making parallel requests in
6 class EMHttp < Faraday::Adapter
8 def connection_config(env)
10 configure_proxy(options, env)
11 configure_timeout(options, env)
12 configure_socket(options, env)
13 configure_ssl(options, env)
17 def request_config(env)
19 :body => read_body(env),
20 :head => env[:request_headers],
22 # :file => 'path/to/file', # stream data off disk
24 configure_compression(options, env)
30 body.respond_to?(:read) ? body.read : body
33 def configure_proxy(options, env)
34 if proxy = request_options(env)[:proxy]
36 :host => proxy[:uri].host,
37 :port => proxy[:uri].port,
38 :authorization => [proxy[:user], proxy[:password]]
43 def configure_socket(options, env)
44 if bind = request_options(env)[:bind]
52 def configure_ssl(options, env)
53 if env[:url].scheme == 'https' && env[:ssl]
55 :cert_chain_file => env[:ssl][:ca_file],
56 :verify_peer => env[:ssl].fetch(:verify, true)
61 def configure_timeout(options, env)
62 timeout, open_timeout = request_options(env).values_at(:timeout, :open_timeout)
63 options[:connect_timeout] = options[:inactivity_timeout] = timeout
64 options[:connect_timeout] = open_timeout if open_timeout
67 def configure_compression(options, env)
68 if env[:method] == :get and not options[:head].key? 'accept-encoding'
69 options[:head]['accept-encoding'] = 'gzip, compressed'
73 def request_options(env)
82 self.supports_parallel = true
84 def self.setup_parallel_manager(options = nil)
94 def perform_request(env)
96 manager = env[:parallel_manager]
98 perform_single_request(env).
99 callback { env[:response].finish(env) }
102 unless EventMachine.reactor_running?
104 # start EM, block until request is completed
106 perform_single_request(env).
107 callback { EventMachine.stop }.
109 error = error_message(client)
113 raise_error(error) if error
115 # EM is running: instruct upstream that this is an async request
116 env[:parallel_manager] = true
117 perform_single_request(env).
118 callback { env[:response].finish(env) }.
120 # TODO: no way to communicate the error in async mode
121 raise NotImplementedError
125 rescue EventMachine::Connectify::CONNECTError => err
126 if err.message.include?("Proxy Authentication Required")
127 raise Error::ConnectionFailed, %{407 "Proxy Authentication Required "}
129 raise Error::ConnectionFailed, err
132 if defined?(OpenSSL) && OpenSSL::SSL::SSLError === err
133 raise Faraday::SSLError, err
139 # TODO: reuse the connection to support pipelining
140 def perform_single_request(env)
141 req = EventMachine::HttpRequest.new(env[:url], connection_config(env))
142 req.setup_request(env[:method], request_config(env)).callback { |client|
143 save_response(env, client.response_header.status, client.response) do |resp_headers|
144 client.response_header.each do |name, value|
145 resp_headers[name.to_sym] = value
151 def error_message(client)
152 client.error or "request failed"
156 errklass = Faraday::Error::ClientError
157 if msg == Errno::ETIMEDOUT
158 errklass = Faraday::Error::TimeoutError
159 msg = "request timed out"
160 elsif msg == Errno::ECONNREFUSED
161 errklass = Faraday::Error::ConnectionFailed
162 msg = "connection refused"
163 elsif msg == "connection closed by server"
164 errklass = Faraday::Error::ConnectionFailed
170 !!env[:parallel_manager]
173 # The parallel manager is designed to start an EventMachine loop
174 # and block until all registered requests have been completed.
181 @registered_procs = []
188 def running?() @running end
192 perform_request { yield }
194 @registered_procs << Proc.new
200 if @num_registered > 0
203 @registered_procs.each do |proc|
204 perform_request(&proc)
208 raise Faraday::Error::ClientError, @errors.first || "connection failed"
217 client.callback { @num_succeeded += 1; check_finished }
218 client.errback { @errors << client.error; check_finished }
222 if @num_succeeded + @errors.size == @num_registered
234 warn "Warning: no such file to load -- openssl. Make sure it is installed if you want HTTPS support"
236 require 'faraday/adapter/em_http_ssl_patch'
237 end if Faraday::Adapter::EMHttp.loaded?