lib/petri_net/graph/graph.rb
require 'graphviz'
require 'graphviz/theory'
require 'rgl/adjacency'
require 'rgl/dijkstra'
class PetriNet::InfiniteReachabilityGraphError < RuntimeError
end
class PetriNet::Graph < PetriNet::Base
# The PetriNet this graph belongs to
attr_reader :net
# all nodes from this graph
attr_reader :nodes
# all edges of this graph
attr_reader :edges
def initialize(net, options = Hash.new)
@net = net
@objects = Array.new
@nodes = Hash.new
@edges = Hash.new
@name = net.name
@type = "Reachability"
if options['unlimited'].nil?
@unlimited = true
else
@unlimited = options['unlimited']
end
end
def add_node(node)
if node.validate && (!@objects.include? node)
@objects[node.id] = node
@nodes[node.name] = node.id
node.graph = self
return node.id
end
if @objects.include? node
res = (@objects.index node) * -1
return res
end
return false
end
alias_method :add_node!, :add_node
def add_edge(edge)
if (edge.validate && (!@edges.include? edge.name))
@objects[edge.id] = edge
@edges[edge.name] = edge.id
edge.graph = self
edge.source.outputs << edge.id
edge.destination.inputs << edge.id
return edge.id
end
return false
end
def get_edge(source, dest)
res = nil
@edges.each_value do |edge|
if @objects[edge].source == source && @objects[edge].destination == dest
res = @objects[edge]
end
end
res
end
# Add an object to the Graph.
def <<(object)
case object.class.to_s
when "Array"
object.each {|o| self << o}
when "PetriNet::ReachabilityGraph::Edge"
add_edge(object)
when "PetriNet::ReachabilityGraph::Node"
add_node(object)
else
raise "(PetriNet::ReachabilityGraph) Unknown object #{object.class}."
end
self
end
alias_method :add_object, :<<
def get_node(node)
if node.class.to_s == "Fixnum"
return @objects[node]
end
if node.class.to_s == "Array"
return @objects.select{|o| o.class.to_s == "PetriNet::ReachabilityGraph::Node" && o.markings == node}.first
end
end
def get_object(id)
@objects[id]
end
def get_nodes
res = Array.new
@nodes.each_value do |n|
res << @objects[n]
end
res
end
def infinite?
@nodes.each_value do |node|
return true if @objects[node].infinite?
end
false
end
def to_gv(output = 'png', filename = '')
g = generate_gv
if filename.empty?
filename = "#{@name}_graph.png"
end
g.output( :png => filename ) if output == 'png'
g.output
end
def generate_gv(named = true)
g = GraphViz.new( :G, :type => :digraph )
@nodes.each_value do |node|
if named
label = (@net.get_place_from_marking @objects[node].markings).to_s
else
label = @objects[node].markings.to_s
end
gv_node = g.add_nodes( label )
gv_node.set do |n|
n.label = '*' + label + '*' if @objects[node].start
end
end
@edges.each_value do |edge|
if named
label1 = (@net.get_place_from_marking @objects[edge].source.markings).to_s
else
label1 = @objects[edge].source.markings.to_s
end
if named
label2 = (@net.get_place_from_marking @objects[edge].destination.markings).to_s
else
label2 = @objects[edge].destination.markings.to_s
end
gv_edge = g.add_edges( label1, label2 )
gv_edge.set do |e|
e.label = @objects[edge].transition
end
end
@gv = g
end
def generate_rgl
g = RGL::DirectedAdjacencyGraph.new
@nodes.each_value do |node|
g.add_vertex @objects[node].markings.to_s
end
@edges.each_value do |edge|
g.add_edge @objects[edge].source.markings.to_s, @objects[edge].destination.markings.to_s
end
@rgl = g
end
def to_s
str = "#{@type} Graph [#{@name}]\n"
str += "----------------------------\n"
str += "Description: #{@description}\n"
str += "Filename: #{@filename}\n"
str += "\n"
str += "Nodes\n"
str += "----------------------------\n"
@nodes.each_value {|p| str += @objects[p].to_s + "\n" }
str += "\n"
str += "Edges\n"
str += "----------------------------\n"
@edges.each_value {|t| str += @objects[t].to_s + "\n" }
str += "\n"
return str
end
def get_rgl
if @rgl.nil?
generate_rgl
end
@rgl
end
def cycles
get_rgl.cycles
end
def shortest_path(start, destination)
g = get_rgl
weights = lambda { |edge| 1 }
g.dijkstra_shortest_path(weights, start.to_s, destination.to_s)
end
def path_probability(path)
sanitize_probabilities
prob = 1
counter = 0
path.each do |node|
edge = get_edge(path[counter+1], node)
prob = prob * edge.probability unless edge.nil? # last node has no pre-edge
counter = counter += 1
end
prob
end
def node_probability(start, node)
paths = get_paths_without_loops(start, node)
prob = 0
paths.each do |path|
prob = prob + (path_probability path)
end
prob
end
def best_path(start, node)
paths = get_paths_without_loops(start, node)
prob = 0
res_path = nil
paths.each do |path|
if (path_probability path) >= prob
prob = (path_probability path)
res_path = path
end
end
[res_path,prob]
end
def worst_path(start, node)
paths = get_paths_without_loops(start, node)
prob = 1
res_path = nil
paths.each do |path|
if (path_probability path) <= prob
prob = (path_probability path)
res_path = path
end
end
[res_path,prob]
end
def get_most_relative_influence_on_path(start, node, factor = 0.5)
prob_counter = 0
prob_diff = Array.new
paths = get_paths_without_loops(start, node)
paths.each do |path|
counter = 0
path.each do |curr_node|
prob_before = node_probability start, node
unless node == path[-1]
edge = get_edge(path[counter + 1], curr_node)
save = edge.probability unless edge.nil?
edge.probability += (1 - edge.probability) * factor unless edge.nil?
prob_after = node_probability start, node unless edge.nil?
edge.probability = save unless edge.nil?
prob_diff[prob_counter] = [prob_after - prob_before, [curr_node, path[counter+1]]] unless edge.nil?
prob_counter += 1 unless edge.nil?
counter += 1
end
end
end
prob_diff.sort{|x,y| x[0] <=> y[0]}.last
end
def get_most_absolute_influence_on_path(start, node, sum = 0.01)
prob_counter = 0
prob_diff = Array.new
paths = get_paths_without_loops(start, node)
paths.each do |path|
counter = 0
path.each do |curr_node|
prob_before = node_probability start, node
unless node == path[-1]
edge = get_edge(path[counter + 1], curr_node)
save = edge.probability unless edge.nil?
edge.probability = edge.probability + sum unless edge.nil?
if !edge.nil? && edge.probability > 1
edge.probability = 1
end
prob_after = node_probability start, node unless edge.nil?
edge.probability = save unless edge.nil?
prob_diff[prob_counter] = [prob_after - prob_before, [curr_node, path[counter+1]]] unless edge.nil?
prob_counter += 1 unless edge.nil?
counter += 1
end
end
end
prob_diff.sort{|x,y| x[0] <=> y[0]}.last
end
def get_paths_without_loops(start, goal)
get_paths_without_loops_helper(get_node(start), get_node(goal))
end
def sanitize_probabilities
@nodes.each_value do |node|
prob = 1.0
@objects[node].outputs.each do |edge|
prob = prob + @objects[edge].probability unless @objects[edge].probability.nil?
end
@objects[node].outputs.each do |edge|
@objects[edge].probability = prob/@objects[node].outputs.size if @objects[edge].probability.nil?
end
end
end
private
def get_paths_without_loops_helper(start, goal, reverse_paths = Array.new, reverse_path = Array.new)
if goal == start
reverse_path << goal
reverse_paths << reverse_path
return nil
end
if reverse_path.include? goal
return nil
end
path = Array.new
goal.inputs.each do |input|
get_paths_without_loops_helper(start, @objects[input].source, reverse_paths, reverse_path.clone << goal)
end
reverse_paths
end
end