class Concurrent::RubyThreadPoolExecutor
@!macro thread_pool_executor @!macro thread_pool_options @!visibility private
Constants
- DEFAULT_MAX_POOL_SIZE
@!macro thread_pool_executor_constant_default_max_pool_size
- DEFAULT_MAX_QUEUE_SIZE
@!macro thread_pool_executor_constant_default_max_queue_size
- DEFAULT_MIN_POOL_SIZE
@!macro thread_pool_executor_constant_default_min_pool_size
- DEFAULT_SYNCHRONOUS
@!macro thread_pool_executor_constant_default_synchronous
- DEFAULT_THREAD_IDLETIMEOUT
@!macro thread_pool_executor_constant_default_thread_timeout
Attributes
@!macro thread_pool_executor_attr_reader_idletime
@!macro thread_pool_executor_attr_reader_max_length
@!macro thread_pool_executor_attr_reader_max_queue
@!macro thread_pool_executor_attr_reader_min_length
@!macro thread_pool_executor_attr_reader_synchronous
Public Class Methods
@!macro thread_pool_executor_method_initialize
Concurrent::RubyExecutorService::new
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 47 def initialize(opts = {}) super(opts) end
Public Instance Methods
@!macro thread_pool_executor_method_active_count
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 67 def active_count synchronize do @pool.length - @ready.length end end
@!macro executor_service_method_can_overflow_question
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 74 def can_overflow? synchronize { ns_limited_queue? } end
@!macro thread_pool_executor_attr_reader_completed_task_count
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 62 def completed_task_count synchronize { @completed_task_count } end
@!macro thread_pool_executor_attr_reader_largest_length
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 52 def largest_length synchronize { @largest_length } end
@!macro thread_pool_executor_attr_reader_length
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 79 def length synchronize { @pool.length } end
@!macro thread_pool_executor_method_prune_pool
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 139 def prune_pool deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082." end
removes the worker if it can be pruned
@return [true, false] if the worker was pruned
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 104 def prune_worker(worker) synchronize do if ns_prunable_capacity > 0 remove_worker worker true else false end end end
@!macro thread_pool_executor_attr_reader_queue_length
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 84 def queue_length synchronize { @queue.length } end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 124 def ready_worker(worker, last_message) synchronize { ns_ready_worker worker, last_message } end
@!macro thread_pool_executor_attr_reader_remaining_capacity
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 89 def remaining_capacity synchronize do if ns_limited_queue? @max_queue - @queue.length else -1 end end end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 116 def remove_worker(worker) synchronize do ns_remove_ready_worker worker ns_remove_busy_worker worker end end
@!macro thread_pool_executor_attr_reader_scheduled_task_count
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 57 def scheduled_task_count synchronize { @scheduled_task_count } end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 129 def worker_died(worker) synchronize { ns_worker_died worker } end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 134 def worker_task_completed synchronize { @completed_task_count += 1 } end
Private Instance Methods
creates new worker which has to receive work to do after it’s added @return [nil, Worker] nil of max capacity is reached
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 257 def ns_add_busy_worker return if @pool.size >= @max_length @workers_counter += 1 @pool << (worker = Worker.new(self, @workers_counter)) @largest_length = @pool.length if @pool.length > @largest_length worker end
tries to assign task to a worker, tries to get one from @ready or to create new one @return [true, false] if task is assigned to a worker
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 217 def ns_assign_worker(*args, &task) # keep growing if the pool is not at the minimum yet worker, _ = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker if worker worker << [task, args] true else false end rescue ThreadError # Raised when the operating system refuses to create the new thread return false end
tries to enqueue task @return [true, false] if enqueued
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 235 def ns_enqueue(*args, &task) return false if @synchronous if !ns_limited_queue? || @queue.size < @max_queue @queue << [task, args] true else false end end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 178 def ns_execute(*args, &task) ns_reset_if_forked if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task) @scheduled_task_count += 1 nil else fallback_action(*args, &task) end end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 146 def ns_initialize(opts) @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i @synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS) @fallback_policy = opts.fetch(:fallback_policy, :abort) raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0 raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length @pool = [] # all workers @ready = [] # used as a stash (most idle worker is at the start) @queue = [] # used as queue # @ready or @queue is empty at all times @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @ruby_pid = $$ # detects if Ruby has forked end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 205 def ns_kill_execution # TODO log out unprocessed tasks in queue # TODO try to shutdown first? @pool.each(&:kill) @pool.clear @ready.clear end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 173 def ns_limited_queue? @max_queue != 0 end
@return [Integer] number of excess idle workers which can be removed without
going below min_length, or all workers if not running
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 305 def ns_prunable_capacity if running? [@pool.size - @min_length, @ready.size].min else @pool.size end end
handle ready worker, giving it new job or assigning back to @ready
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 269 def ns_ready_worker(worker, last_message, success = true) task_and_args = @queue.shift if task_and_args worker << task_and_args else # stop workers when !running?, do not return them to @ready if running? raise unless last_message @ready.push([worker, last_message]) else worker.stop end end end
removes a worker which is not tracked in @ready
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 287 def ns_remove_busy_worker(worker) @pool.delete(worker) stopped_event.set if @pool.empty? && !running? true end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 294 def ns_remove_ready_worker(worker) if index = @ready.index { |rw, _| rw == worker } @ready.delete_at(index) end true end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 314 def ns_reset_if_forked if $$ != @ruby_pid @queue.clear @ready.clear @pool.clear @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @ruby_pid = $$ end end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 190 def ns_shutdown_execution ns_reset_if_forked if @pool.empty? # nothing to do stopped_event.set end if @queue.empty? # no more tasks will be accepted, just stop all workers @pool.each(&:stop) end end
@!visibility private
# File lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb, line 247 def ns_worker_died(worker) ns_remove_busy_worker worker replacement_worker = ns_add_busy_worker ns_ready_worker replacement_worker, Concurrent.monotonic_time, false if replacement_worker end