Class: Opal::BuilderScheduler::Prefork::ForkSet

Inherits:
Array
  • Object
show all
Defined in:
opal/lib/opal/builder_scheduler/prefork.rb

Instance Method Summary collapse

Constructor Details

#initialize(count, &block) ⇒ ForkSet

Returns a new instance of ForkSet.



23
24
25
26
27
28
29
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 23

def initialize(count, &block)
  super([])

  @count, @block = count, block

  create_fork
end

Instance Method Details

#closeObject



70
71
72
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 70

def close
  each(&:close)
end

#create_forkObject



62
63
64
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 62

def create_fork
  self << Fork.new(self, &@block)
end

#from_io(io, type) ⇒ Object



66
67
68
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 66

def from_io(io, type)
  find { |i| i.__send__(type) == io }
end

#get_events(queue_length) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 31

def get_events(queue_length)
  # Wait for anything to happen:
  # - Either any of our workers return some data
  # - Or any workers become ready to receive data
  #   - But only if we have enough work for them
  ios = IO.select(
    map(&:read_io),
    sample(queue_length).map(&:write_io),
    []
  )
  return [[], []] unless ios

  events = ios[0].map do |io|
    io = from_io(io, :read_io)
    [io, *io.recv]
  end

  idles = ios[1].map do |io|
    from_io(io, :write_io)
  end

  # Progressively create forks, because we may not need all
  # the workers at the time. The number 6 was picked due to
  # some trial and error on a Ryzen machine.
  #
  # Do note that prefork may happen more than once.
  create_fork if length < @count && rand(6) == 1

  [events, idles]
end

#waitObject



74
75
76
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 74

def wait
  each(&:wait)
end