lib/sisfc/simulation.rb
# frozen_string_literal: true
require_relative './data_center'
require_relative './event'
require_relative './generator'
require_relative './sorted_array'
require_relative './statistics'
require_relative './vm'
require_relative './latency_manager'
module SISFC
class Simulation
UNFEASIBLE_ALLOCATION_EVALUATION = { unfeasible_configuration: -Float::INFINITY }.freeze
attr_reader :start_time
def initialize(opts = {})
@configuration = opts[:configuration]
@evaluator = opts[:evaluator]
end
def new_event(type, data, time, destination)
e = Event.new(type, data, time, destination)
@event_queue << e
end
def now
@current_time
end
def evaluate_allocation(vm_allocation)
# TODO: allow to define which feasibility controls to run in simulation
# configuration. Here we hardcode a simple feasibility check: fail unless
# there is at least one vm for each software component.
@configuration.service_component_types.each do |sc_id,_|
unless vm_allocation.find{|x| x[:component_type] == sc_id }
puts "====== Unfeasible allocation ======\n" +
"costs: #{UNFEASIBLE_ALLOCATION_EVALUATION}\n" +
"vm_allocation: #{vm_allocation.inspect}\n" +
"=======================================\n"
return UNFEASIBLE_ALLOCATION_EVALUATION
end
end
# seeds
latency_seed = @configuration.seeds[:communication_latencies]
service_time_seed = @configuration.seeds[:service_times]
next_component_rng = if @configuration.seeds[:next_component_selection]
Random.new(@configuration.seeds[:next_component_selection])
else
Random.new
end
# create latency manager
latency_manager = latency_seed ?
LatencyManager.new(@configuration.latency_models, seed: latency_seed) :
LatencyManager.new(@configuration.latency_models)
# setup simulation start and current time
@current_time = @start_time = @configuration.start_time
# create data centers and store them in a repository
data_center_repository = Hash[
@configuration.data_centers.map do |k,v|
[ k, DataCenter.new(id: k, **v) ]
end
]
customer_repository = @configuration.customers
workflow_type_repository = @configuration.workflow_types
# initialize statistics
stats = Statistics.new
per_workflow_and_customer_stats = Hash[
workflow_type_repository.keys.map do |wft_id|
[
wft_id,
Hash[
customer_repository.keys.map do |c_id|
[ c_id, Statistics.new(@configuration.custom_stats.find{|x| x[:customer_id] == c_id && x[:workflow_type_id] == wft_id } || {}) ]
end
]
]
end
]
reqs_received_per_workflow_and_customer = Hash[
workflow_type_repository.keys.map do |wft_id|
[ wft_id, Hash[customer_repository.keys.map {|c_id| [ c_id, 0 ]}] ]
end
]
# create VMs
@vms = []
vmid = 0
vm_allocation.each do |opts|
# setup service_time_distribution
stdist = @configuration.service_component_types[opts[:component_type]][:service_time_distribution]
# allocate the VMs
opts[:vm_num].times do
# create VM ...
vm = service_time_seed ?
VM.new(vmid, opts[:dc_id], opts[:vm_size], stdist, seed: service_time_seed) :
VM.new(vmid, opts[:dc_id], opts[:vm_size], stdist)
# ... add it to the vm list ...
@vms << vm
# ... and register it in the corresponding data center
unless data_center_repository[opts[:dc_id]].add_vm(vm, opts[:component_type])
$stderr.puts "====== Unfeasible allocation at data center #{dc_id} ======"
$stderr.flush
# here we return Float::MAX instead of, e.g., Float::INFINITY,
# because the latter would break optimization tools. instead, we
# want to have a very high but comparable value.
return Float::MAX
end
# update vm id
vmid += 1
end
end
# create event queue
@event_queue = SortedArray.new
# puts "========== Simulation Start =========="
# generate first request
rg = RequestGenerator.new(@configuration.request_generation)
req_attrs = rg.generate
new_event(Event::ET_REQUEST_GENERATION, req_attrs, req_attrs[:generation_time], nil)
# schedule end of simulation
unless @configuration.end_time.nil?
# puts "Simulation ends at: #{@configuration.end_time}"
new_event(Event::ET_END_OF_SIMULATION, nil, @configuration.end_time, nil)
end
# calculate warmup threshold
warmup_threshold = @configuration.start_time + @configuration.warmup_duration.to_i
requests_being_worked_on = 0
requests_forwarded_to_other_dcs = 0
current_event = 0
requests_arrived = 0
# launch simulation
until @event_queue.empty?
e = @event_queue.shift
current_event += 1
# sanity check on simulation time flow
if @current_time > e.time
raise "Error: simulation time inconsistency for event #{current_event} " +
"e.type=#{e.type} @current_time=#{@current_time}, e.time=#{e.time}"
end
@current_time = e.time
case e.type
when Event::ET_REQUEST_GENERATION
req_attrs = e.data
# find closest data center
customer_location_id = customer_repository.dig(req_attrs[:customer_id], :location_id)
dc_at_customer_location = data_center_repository.values.find {|dc| dc.location_id == customer_location_id }
raise "No data center found at location id #{customer_location_id}!" unless dc_at_customer_location
# find first component name for requested workflow
workflow = workflow_type_repository[req_attrs[:workflow_type_id]]
first_component_name = workflow[:component_sequence][0][:name]
closest_dc = if dc_at_customer_location.has_vms_of_type?(first_component_name)
dc_at_customer_location
else
data_center_repository.values.select{|dc| dc.has_vms_of_type?(first_component_name) }&.sample
end
raise "Invalid configuration! No VMs of type #{first_component_name} found!" unless closest_dc
arrival_time = @current_time + latency_manager.sample_latency_between(customer_location_id, closest_dc.location_id)
new_req = Request.new(**req_attrs.merge!(initial_data_center_id: closest_dc.dcid,
arrival_time: arrival_time))
# schedule arrival of current request
new_event(Event::ET_REQUEST_ARRIVAL, new_req, arrival_time, nil)
# schedule generation of next request
req_attrs = rg.generate
new_event(Event::ET_REQUEST_GENERATION, req_attrs, req_attrs[:generation_time], nil)
when Event::ET_REQUEST_ARRIVAL
# get request
req = e.data
requests_arrived += 1
if requests_arrived % 10_000 == 0
puts "requests_arrived: #{requests_arrived}"; $stdout.flush
end
# find data center
data_center = data_center_repository[req.data_center_id]
# update reqs_received_per_workflow_and_customer
reqs_received_per_workflow_and_customer[req.workflow_type_id][req.customer_id] += 1
# find next component name
workflow = workflow_type_repository[req.workflow_type_id]
next_component_name = workflow[:component_sequence][req.next_step][:name]
# get random vm providing next service component type
vm = data_center.get_random_vm(next_component_name, random: next_component_rng)
# schedule request forwarding to vm
new_event(Event::ET_REQUEST_FORWARDING, req, e.time, vm)
# update stats
if req.arrival_time > warmup_threshold
# increase the number of requests being worked on
requests_being_worked_on += 1
# increase count of received requests
stats.request_received
# increase count of received requests in per_workflow_and_customer_stats
per_workflow_and_customer_stats[req.workflow_type_id][req.customer_id].request_received
end
# Leave these events for when we add VM migration support
# when Event::ET_VM_SUSPEND
# when Event::ET_VM_RESUME
when Event::ET_REQUEST_FORWARDING
# get request
req = e.data
time = e.time
vm = e.destination
vm.new_request(self, req, time)
when Event::ET_WORKFLOW_STEP_COMPLETED
# retrieve request and vm
req = e.data
vm = e.destination
# tell the old vm that it can start processing another request
vm.request_finished(self, e.time)
# find data center and workflow
data_center = data_center_repository[req.data_center_id]
workflow = workflow_type_repository[req.workflow_type_id]
# check if there are other steps left to complete the workflow
if req.next_step < workflow[:component_sequence].size
# find next component name
next_component_name = workflow[:component_sequence][req.next_step][:name]
# get random VM providing next service component type
new_vm = data_center.get_random_vm(next_component_name, random: next_component_rng)
# this is the request's time of arrival at the new VM
forwarding_time = e.time
# there might not be a VM of the type we need in the current data
# center, so look in the other data centers
unless new_vm
# get list of other data centers, randomly picked
other_dcs = data_center_repository.values.
select{|x| x != data_center && x.has_vms_of_type?(next_component_name) }&.
shuffle(random: next_component_rng)
other_dcs.each do |dc|
new_vm = dc.get_random_vm(next_component_name, random: next_component_rng)
if new_vm
# need to update data_center_id of request
req.data_center_id = dc.dcid
# keep track of transmission time
transmission_time =
latency_manager.sample_latency_between(data_center.location_id,
dc.location_id)
unless transmission_time >= 0.0
raise "Negative transmission time (#{transmission_time})!"
end
req.update_transfer_time(transmission_time)
forwarding_time += transmission_time
# update request's current data_center_id
req.data_center_id = dc.dcid
# keep track of number of requests forwarded to other data centers
requests_forwarded_to_other_dcs += 1
# we're done here
break
end
end
end
# make sure we actually found a VM
raise "Cannot find VM running a component of type " +
"#{next_component_name} in any data center!" unless new_vm
# schedule request forwarding to vm
new_event(Event::ET_REQUEST_FORWARDING, req, forwarding_time, new_vm)
else # workflow is finished
# calculate transmission time
transmission_time =
latency_manager.sample_latency_between(
# data center location
data_center_repository[req.data_center_id].location_id,
# customer location
customer_repository.dig(req.customer_id, :location_id)
)
unless transmission_time >= 0.0
raise "Negative transmission time (#{transmission_time})!"
end
# keep track of transmission time
req.update_transfer_time(transmission_time)
# schedule request closure
new_event(Event::ET_REQUEST_CLOSURE, req, e.time + transmission_time, nil)
end
when Event::ET_REQUEST_CLOSURE
# retrieve request and vm
req = e.data
# request is closed
req.finished_processing(e.time)
# update stats
if req.arrival_time > warmup_threshold
# decrease the number of requests being worked on
requests_being_worked_on -= 1
# collect request statistics
stats.record_request(req)
# collect request statistics in per_workflow_and_customer_stats
per_workflow_and_customer_stats[req.workflow_type_id][req.customer_id].record_request(req)
end
when Event::ET_END_OF_SIMULATION
puts "#{e.time}: end simulation"
break
end
end
# puts "========== Simulation Finished =========="
costs = @evaluator.evaluate_business_impact(stats, per_workflow_and_customer_stats,
vm_allocation)
puts "====== Evaluating new allocation ======\n" +
"costs: #{costs}\n" +
"vm_allocation: #{vm_allocation.inspect}\n" +
"stats: #{stats.to_s}\n" +
"per_workflow_and_customer_stats: #{per_workflow_and_customer_stats.to_s}\n" +
"=======================================\n"
# we want to minimize the cost, so we define fitness as the opposite of
# the sum of all costs incurred
-costs.values.inject(0.0){|s,x| s += x }
end
end
end