ash/lib/sat_solver/sat_solver.ex
2024-04-04 18:21:51 -04:00

1190 lines
33 KiB
Elixir

defmodule Ash.SatSolver do
@moduledoc """
Tools for working with the satsolver that drives filter subset checking (for authorization)
"""
alias Ash.Filter
alias Ash.Query.{BooleanExpression, Not, Ref}
@dialyzer {:nowarn_function, overlap?: 2}
@doc """
Creates tuples of a boolean statement.
i.e `b(1 and 2) #=> {:and, 1, 2}`
"""
defmacro b(statement) do
value =
Macro.prewalk(
statement,
fn
{:and, _, [left, right]} ->
quote do
{:and, unquote(left), unquote(right)}
end
{:or, _, [left, right]} ->
quote do
{:or, unquote(left), unquote(right)}
end
{:not, _, [value]} ->
quote do
{:not, unquote(value)}
end
other ->
other
end
)
quote do
unquote(value)
|> Ash.SatSolver.balance()
end
end
@doc false
def balance({op, left, right}) do
left = balance(left)
right = balance(right)
[left, right] = Enum.sort([left, right])
{op, left, right}
end
def balance({:not, {:not, right}}) do
balance(right)
end
def balance({:not, statement}) do
{:not, balance(statement)}
end
def balance(other), do: other
@doc "Returns true if the candidate filter returns the same or less data than the filter"
def strict_filter_subset(filter, candidate) do
case {filter, candidate} do
{%{expression: nil}, %{expression: nil}} ->
true
{%{expression: nil}, _candidate_expr} ->
true
{_filter_expr, %{expression: nil}} ->
false
{filter, candidate} ->
do_strict_filter_subset(filter, candidate)
end
end
defp do_strict_filter_subset(filter, candidate) do
filter =
Filter.map(filter, fn
%Ref{} = ref ->
%{ref | input?: false}
other ->
other
end)
candidate =
Filter.map(candidate, fn
%Ref{} = ref ->
%{ref | input?: false}
other ->
other
end)
expr = BooleanExpression.new(:and, filter.expression, candidate.expression)
case transform_and_solve(
filter.resource,
expr
) do
{:error, :unsatisfiable} ->
false
{:ok, _scenario} ->
expr = BooleanExpression.new(:and, Not.new(filter.expression), candidate.expression)
case transform_and_solve(
filter.resource,
expr
) do
{:error, :unsatisfiable} ->
true
{:ok, _scenario} ->
:maybe
end
end
end
defp filter_to_expr(nil), do: nil
defp filter_to_expr(false), do: false
defp filter_to_expr(true), do: true
defp filter_to_expr(%Filter{expression: expression}), do: filter_to_expr(expression)
defp filter_to_expr(%{__predicate__?: _} = op_or_func), do: op_or_func
defp filter_to_expr(%Ash.Query.Exists{} = exists), do: exists
defp filter_to_expr(%Ash.Query.Parent{} = parent), do: parent
defp filter_to_expr(%Ash.CustomExpression{expression: expression}), do: expression
defp filter_to_expr(%Not{expression: expression}), do: b(not filter_to_expr(expression))
defp filter_to_expr(%BooleanExpression{op: op, left: left, right: right}) do
{op, filter_to_expr(left), filter_to_expr(right)}
end
defp filter_to_expr(expr) do
raise ArgumentError, message: "Invalid filter expression #{inspect(expr)}"
end
@doc "Prepares a filter for comparison"
def transform(resource, expression) do
expression
|> consolidate_relationships(resource)
|> upgrade_related_filters_to_join_keys(resource)
|> build_expr_with_predicate_information()
end
@doc "Calls `transform/2` and solves the expression"
def transform_and_solve(resource, expression) do
resource
|> transform(expression)
|> to_cnf()
|> elem(0)
|> solve_expression()
end
defp upgrade_related_filters_to_join_keys(
%BooleanExpression{op: op, left: left, right: right},
resource
) do
BooleanExpression.new(
op,
upgrade_related_filters_to_join_keys(left, resource),
upgrade_related_filters_to_join_keys(right, resource)
)
end
defp upgrade_related_filters_to_join_keys(%Not{expression: expression}, resource) do
Not.new(upgrade_related_filters_to_join_keys(expression, resource))
end
defp upgrade_related_filters_to_join_keys(
%Ash.Query.Exists{path: path, expr: expr} = exists,
resource
) do
related = Ash.Resource.Info.related(resource, path)
%{exists | expr: upgrade_related_filters_to_join_keys(expr, related)}
end
defp upgrade_related_filters_to_join_keys(
%{__operator__?: true, left: left, right: right} = op,
resource
) do
%{op | left: upgrade_ref(left, resource), right: upgrade_ref(right, resource)}
end
defp upgrade_related_filters_to_join_keys(
%{__function__?: true, arguments: arguments} = function,
resource
) do
%{function | arguments: Enum.map(arguments, &upgrade_ref(&1, resource))}
end
defp upgrade_related_filters_to_join_keys(expr, _), do: expr
defp upgrade_ref({key, ref}, resource) when is_atom(key) do
{key, upgrade_ref(ref, resource)}
end
defp upgrade_ref(
%Ash.Query.Ref{attribute: attribute, relationship_path: path} = ref,
resource
)
when path != [] do
with relationship when not is_nil(relationship) <-
Ash.Resource.Info.relationship(resource, path),
true <- attribute.name == relationship.destination_attribute,
new_attribute when not is_nil(new_attribute) <-
Ash.Resource.Info.attribute(relationship.source, relationship.source_attribute) do
%{
ref
| relationship_path: :lists.droplast(path),
attribute: new_attribute,
resource: resource
}
else
_ ->
ref
end
end
defp upgrade_ref(other, _), do: other
defp consolidate_relationships(expression, resource) do
{replacements, _all_relationship_paths} =
expression
|> Filter.relationship_paths(true)
|> Enum.uniq()
|> Enum.reduce({%{}, []}, fn path, {replacements, kept_paths} ->
case find_synonymous_relationship_path(resource, kept_paths, path) do
nil ->
{replacements, [path | kept_paths]}
synonymous_path ->
Map.put(replacements, path, synonymous_path)
end
end)
do_consolidate_relationships(expression, replacements, resource)
end
defp do_consolidate_relationships(
%BooleanExpression{op: op, left: left, right: right},
replacements,
resource
) do
BooleanExpression.new(
op,
do_consolidate_relationships(left, replacements, resource),
do_consolidate_relationships(right, replacements, resource)
)
end
defp do_consolidate_relationships(%Not{expression: expression}, replacements, resource) do
Not.new(do_consolidate_relationships(expression, replacements, resource))
end
defp do_consolidate_relationships(
%Ash.Query.Exists{at_path: at_path, path: path, expr: expr} = exists,
replacements,
resource
) do
exists =
case Map.fetch(replacements, at_path) do
{:ok, replacement} when not is_nil(replacement) ->
%{exists | at_path: replacement}
:error ->
exists
end
related = Ash.Resource.Info.related(resource, at_path)
{replacements, _all_relationship_paths} =
expr
|> Filter.relationship_paths(true)
|> Enum.uniq()
|> Enum.reduce({%{}, []}, fn path, {replacements, kept_paths} ->
case find_synonymous_relationship_path(related, kept_paths, path) do
nil ->
{replacements, [path | kept_paths]}
synonymous_path ->
Map.put(replacements, path, synonymous_path)
end
end)
exists =
case Map.fetch(replacements, path) do
{:ok, replacement} when not is_nil(replacement) ->
%{exists | path: replacement}
:error ->
exists
end
full_related = Ash.Resource.Info.related(related, path)
%{exists | expr: consolidate_relationships(expr, full_related)}
end
defp do_consolidate_relationships(
%Ash.Query.Ref{relationship_path: path} = ref,
replacements,
_resource
)
when path != [] do
case Map.fetch(replacements, path) do
{:ok, replacement} when not is_nil(replacement) -> %{ref | relationship_path: replacement}
:error -> ref
end
end
defp do_consolidate_relationships(
%{__function__?: true, arguments: args} = func,
replacements,
resource
) do
%{func | arguments: Enum.map(args, &do_consolidate_relationships(&1, replacements, resource))}
end
defp do_consolidate_relationships(
%{__operator__?: true, left: left, right: right} = op,
replacements,
resource
) do
%{
op
| left: do_consolidate_relationships(left, replacements, resource),
right: do_consolidate_relationships(right, replacements, resource)
}
end
defp do_consolidate_relationships(other, _, _), do: other
defp find_synonymous_relationship_path(resource, paths, path) do
Enum.find_value(paths, fn candidate_path ->
if synonymous_relationship_paths?(resource, candidate_path, path) do
candidate_path
else
false
end
end)
end
@doc "Returns `true` if the relationship paths are synonymous from a data perspective"
def synonymous_relationship_paths?(
left_resource,
candidate,
search,
right_resource \\ nil
)
def synonymous_relationship_paths?(_, [], [], _), do: true
def synonymous_relationship_paths?(_, [], _, _), do: false
def synonymous_relationship_paths?(_, _, [], _), do: false
def synonymous_relationship_paths?(
left_resource,
[candidate_first | candidate_rest],
[first | rest],
right_resource
) do
right_resource = right_resource || left_resource
relationship = Ash.Resource.Info.relationship(left_resource, first)
candidate_relationship = Ash.Resource.Info.relationship(right_resource, candidate_first)
cond do
!relationship || !candidate_relationship ->
false
relationship.type == :many_to_many && candidate_relationship.type == :has_many ->
synonymous_relationship_paths?(left_resource, [relationship.join_relationship], [
candidate_first
]) && !Enum.empty?(candidate_rest) &&
synonymous_relationship_paths?(
left_resource,
candidate_rest,
rest,
right_resource
)
relationship.type == :has_many && candidate_relationship.type == :many_to_many ->
synonymous_relationship_paths?(left_resource, [relationship.name], [
candidate_relationship.join_relationship
]) && !Enum.empty?(rest) &&
synonymous_relationship_paths?(
left_resource,
candidate_rest,
rest,
right_resource
)
true ->
comparison_keys = [
:source_attribute,
:destination_attribute,
:source_attribute_on_join_resource,
:destination_attribute_on_join_resource,
:destination_attribute,
:destination,
:manual,
:sort,
:filter
]
Map.take(relationship, comparison_keys) ==
Map.take(candidate_relationship, comparison_keys) and
synonymous_relationship_paths?(relationship.destination, candidate_rest, rest)
end
end
defp build_expr_with_predicate_information(expression) do
expression = fully_simplify(expression)
all_predicates =
expression
|> Filter.list_predicates()
|> Enum.uniq()
comparison_expressions =
all_predicates
|> Enum.filter(fn %module{} ->
:erlang.function_exported(module, :compare, 2)
end)
|> Enum.reduce([], fn predicate, new_expressions ->
all_predicates
|> Enum.reject(&Kernel.==(&1, predicate))
|> Enum.filter(&shares_ref?(&1, predicate))
|> Enum.reduce(new_expressions, fn other_predicate, new_expressions ->
# With predicate as a and other_predicate as b
case Ash.Filter.Predicate.compare(predicate, other_predicate) do
:right_includes_left ->
# b || !a
[b(other_predicate or not predicate) | new_expressions]
:left_includes_right ->
# a || ! b
[b(predicate or not other_predicate) | new_expressions]
:mutually_inclusive ->
# (a && b) || (! a && ! b)
[
b((predicate and other_predicate) or (not predicate and not other_predicate))
| new_expressions
]
:mutually_exclusive ->
[b(not (other_predicate and predicate)) | new_expressions]
:mutually_exclusive_and_collectively_exhaustive ->
[
b(
not (other_predicate and predicate) and
not (not other_predicate and not predicate)
)
| new_expressions
]
_other ->
# If we can't tell, we assume that both could be true
new_expressions
end
end)
end)
|> Enum.uniq()
expression = filter_to_expr(expression)
expression_with_comparisons =
Enum.reduce(comparison_expressions, expression, fn comparison_expression, expression ->
b(comparison_expression and expression)
end)
all_predicates
|> Enum.map(& &1.__struct__)
|> Enum.uniq()
|> Enum.flat_map(fn struct ->
if :erlang.function_exported(struct, :bulk_compare, 1) do
struct.bulk_compare(all_predicates)
else
[]
end
end)
|> Enum.reduce(expression_with_comparisons, fn comparison_expression, expression ->
b(comparison_expression and expression)
end)
end
defp fully_simplify(expression) do
expression
|> do_fully_simplify()
|> lift_equals_out_of_in()
|> do_fully_simplify()
end
defp do_fully_simplify(expression) do
expression
|> simplify()
|> case do
^expression ->
expression
simplified ->
fully_simplify(simplified)
end
end
def lift_equals_out_of_in(expression) do
case find_non_equal_overlap(expression) do
nil ->
expression
non_equal_overlap ->
expression
|> split_in_expressions(non_equal_overlap)
|> lift_equals_out_of_in()
end
end
def find_non_equal_overlap(expression) do
Ash.Filter.find(expression, fn sub_expr ->
Ash.Filter.find(expression, fn sub_expr2 ->
# if has_call_or_expression?(sub_expr) || has_call_or_expression?(sub_expr2) do
# false
# else
overlap?(sub_expr, sub_expr2)
# end
end)
end)
end
defp new_in(base, right) do
case MapSet.size(right) do
1 ->
%Ash.Query.Operator.Eq{left: base.left, right: Enum.at(right, 0)}
_ ->
%Ash.Query.Operator.In{left: base.left, right: right}
end
end
def split_in_expressions(
%Ash.Query.Operator.In{right: right} = sub_expr,
%Ash.Query.Operator.Eq{right: value} = non_equal_overlap
) do
if overlap?(non_equal_overlap, sub_expr) do
Ash.Query.BooleanExpression.new(
:or,
new_in(sub_expr, MapSet.delete(right, value)),
non_equal_overlap
)
else
sub_expr
end
end
def split_in_expressions(
%Ash.Query.Operator.In{} = sub_expr,
%Ash.Query.Operator.In{right: right} = non_equal_overlap
) do
if overlap?(sub_expr, non_equal_overlap) do
diff = MapSet.difference(sub_expr.right, right)
if MapSet.size(diff) == 0 do
Enum.reduce(sub_expr.right, nil, fn var, acc ->
BooleanExpression.new(:or, %Ash.Query.Operator.Eq{left: sub_expr.left, right: var}, acc)
end)
else
new_right = new_in(sub_expr, MapSet.intersection(sub_expr.right, right))
Ash.Query.BooleanExpression.new(
:or,
new_in(sub_expr, diff),
new_right
)
end
else
sub_expr
end
end
def split_in_expressions(nil, _), do: nil
def split_in_expressions(%Ash.Filter{expression: expression} = filter, non_equal_overlap),
do: %{filter | expression: split_in_expressions(expression, non_equal_overlap)}
def split_in_expressions(%Not{expression: expression} = not_expr, non_equal_overlap),
do: %{not_expr | expression: split_in_expressions(expression, non_equal_overlap)}
def split_in_expressions(
%BooleanExpression{left: left, right: right} = expr,
non_equal_overlap
),
do: %{
expr
| left: split_in_expressions(left, non_equal_overlap),
right: split_in_expressions(right, non_equal_overlap)
}
def split_in_expressions(other, _), do: other
defp overlap?(
%Ash.Query.Operator.In{left: left, right: %MapSet{} = left_right},
%Ash.Query.Operator.In{left: left, right: %MapSet{} = right_right}
) do
if MapSet.equal?(left_right, right_right) do
false
else
overlap? =
left_right
|> MapSet.intersection(right_right)
|> MapSet.size()
|> Kernel.>(0)
if overlap? do
true
else
false
end
end
end
defp overlap?(_, %Ash.Query.Operator.Eq{right: %Ref{}}),
do: false
defp overlap?(%Ash.Query.Operator.Eq{right: %Ref{}}, _),
do: false
defp overlap?(
%Ash.Query.Operator.Eq{left: left, right: left_right},
%Ash.Query.Operator.In{left: left, right: %MapSet{} = right_right}
) do
MapSet.member?(right_right, left_right)
end
defp overlap?(_left, _right) do
false
end
@doc "Returns a statement expressing that the predicates are mutually exclusive."
def mutually_exclusive(predicates, acc \\ [])
def mutually_exclusive([], acc), do: acc
def mutually_exclusive([predicate | rest], acc) do
new_acc =
Enum.reduce(rest, acc, fn other_predicate, acc ->
[b(not (predicate and other_predicate)) | acc]
end)
mutually_exclusive(rest, new_acc)
end
@doc "Returns a statement expressing that the predicates are mutually exclusive and collectively exhaustive."
def mutually_exclusive_and_collectively_exhaustive([]), do: []
def mutually_exclusive_and_collectively_exhaustive([_]), do: []
def mutually_exclusive_and_collectively_exhaustive(predicates) do
mutually_exclusive(predicates) ++
Enum.flat_map(predicates, fn predicate ->
other_predicates = Enum.reject(predicates, &(&1 == predicate))
other_predicates_union =
Enum.reduce(other_predicates, nil, fn other_predicate, expr ->
if expr do
b(expr or other_predicate)
else
other_predicate
end
end)
b(
not (predicate and other_predicates_union) and
not (not predicate and not other_predicates_union)
)
end)
end
@doc "Returns `b(not (left and right))`"
def left_excludes_right(left, right) do
b(not (left and right))
end
@doc "Returns `b(not (right and left))`"
def right_excludes_left(left, right) do
b(not (right and left))
end
@doc "Returns a statement expressing that the predicates are mutually inclusive"
def mutually_inclusive(predicates, acc \\ [])
def mutually_inclusive([], acc), do: acc
def mutually_inclusive([predicate | rest], acc) do
new_acc =
Enum.reduce(rest, acc, fn other_predicate, acc ->
[b((predicate and other_predicate) or (not predicate and not other_predicate)) | acc]
end)
mutually_exclusive(rest, new_acc)
end
@doc "Returns `b(not (right and not left))`"
def right_implies_left(left, right) do
b(not (right and not left))
end
@doc "Returns `b(not (left and not right))`"
def left_implies_right(left, right) do
b(not (left and not right))
end
defp shares_ref?(left, right) do
any_refs_in_common?(refs(left), refs(right))
end
defp any_refs_in_common?(left_refs, right_refs) do
Enum.any?(left_refs, &(&1 in right_refs))
end
defp refs(%{__operator__?: true, left: left, right: right}) do
Enum.filter([left, right], &match?(%Ref{}, &1)) |> Enum.map(&Map.put(&1, :input?, false))
end
defp refs(%{__function__?: true, arguments: arguments}) do
Enum.filter(arguments, &match?(%Ref{}, &1)) |> Enum.map(&Map.put(&1, :input?, false))
end
defp refs(_), do: []
defp simplify(%BooleanExpression{op: op, left: left, right: right}) do
BooleanExpression.new(op, simplify(left), simplify(right))
end
defp simplify(%Not{expression: expression}) do
Not.new(simplify(expression))
end
defp simplify(%Ash.Query.Exists{expr: expr} = exists) do
%{exists | expr: simplify(expr)}
end
defp simplify(%mod{__predicate__?: true} = predicate) do
if :erlang.function_exported(mod, :simplify, 1) do
predicate
|> mod.simplify()
|> Kernel.||(predicate)
else
predicate
end
end
defp simplify(other), do: other
@doc """
Transforms a statement to Conjunctive Normal Form(CNF), as lists of lists of integers.
"""
def to_cnf(expression) do
expression_with_constants = b(true and not false and expression)
{bindings, expression} = extract_bindings(expression_with_constants)
expression
|> to_conjunctive_normal_form()
|> lift_clauses()
|> negations_to_negative_numbers()
|> Enum.map(fn scenario ->
Enum.sort_by(scenario, fn item ->
{abs(item), item}
end)
end)
|> group_predicates(bindings)
|> rebind()
|> unique_clauses()
end
defp unique_clauses({clauses, bindings}) do
{Enum.uniq(clauses), bindings}
end
defp group_predicates(expression, bindings) do
case expression do
[_] ->
{expression, bindings}
scenarios ->
Enum.reduce(scenarios, {[], bindings}, fn scenario, {new_scenarios, bindings} ->
{scenario, bindings} = group_scenario_predicates(scenario, scenarios, bindings)
{[scenario | new_scenarios], bindings}
end)
end
end
defp group_scenario_predicates(scenario, all_scenarios, bindings) do
scenario
|> Ash.SatSolver.Utils.ordered_sublists()
|> Enum.filter(&can_be_used_as_group?(&1, all_scenarios, bindings))
|> Enum.sort_by(&length/1)
|> remove_overlapping()
|> Enum.reduce({scenario, bindings}, fn group, {scenario, bindings} ->
bindings = add_group_binding(bindings, group)
{Ash.SatSolver.Utils.replace_ordered_sublist(scenario, group, bindings[:groups][group]),
bindings}
end)
end
defp remove_overlapping([]), do: []
defp remove_overlapping([item | rest]) do
if Enum.any?(item, fn n ->
Enum.any?(rest, &(n in &1 or -n in &1))
end) do
remove_overlapping(rest)
else
[item | remove_overlapping(rest)]
end
end
@doc "Remaps integers back to clauses"
def unbind(expression, %{temp_bindings: temp_bindings, old_bindings: old_bindings}) do
expression =
Enum.flat_map(expression, fn statement ->
Enum.flat_map(statement, fn var ->
neg? = var < 0
old_binding = temp_bindings[abs(var)]
case old_bindings[:reverse_groups][old_binding] do
nil ->
if neg? do
[-old_binding]
else
[old_binding]
end
group ->
if neg? do
Enum.map(group, &(-&1))
else
[{:expand, group}]
end
end
end)
|> expand_groups()
end)
{expression, old_bindings}
end
defp expand_groups(expression) do
do_expand_groups(expression)
end
defp do_expand_groups([]), do: [[]]
defp do_expand_groups([{:expand, group} | rest]) do
Enum.flat_map(group, fn var ->
Enum.map(do_expand_groups(rest), fn future ->
[var | future]
end)
end)
end
defp do_expand_groups([var | rest]) do
Enum.map(do_expand_groups(rest), fn future ->
[var | future]
end)
end
defp rebind({expression, bindings}) do
{expression, temp_bindings} =
Enum.reduce(expression, {[], %{current: 0}}, fn statement, {statements, acc} ->
{statement, acc} =
Enum.reduce(statement, {[], acc}, fn var, {statement, acc} ->
case acc[:reverse][abs(var)] do
nil ->
binding = acc.current + 1
value =
if var < 0 do
-binding
else
binding
end
{[value | statement],
acc
|> Map.put(:current, binding)
|> Map.update(:reverse, %{abs(var) => binding}, &Map.put(&1, abs(var), binding))
|> Map.put(binding, abs(var))}
value ->
value =
if var < 0 do
-value
else
value
end
{[value | statement], acc}
end
end)
{[Enum.reverse(statement) | statements], acc}
end)
bindings_with_old_bindings = %{temp_bindings: temp_bindings, old_bindings: bindings}
{expression, bindings_with_old_bindings}
end
defp can_be_used_as_group?(group, scenarios, bindings) do
Map.has_key?(bindings[:groups] || %{}, group) ||
Enum.all?(scenarios, fn scenario ->
has_no_overlap?(scenario, group) || group_in_scenario?(scenario, group)
end)
end
defp has_no_overlap?(scenario, group) do
not Enum.any?(group, fn group_predicate ->
Enum.any?(scenario, fn scenario_predicate ->
abs(group_predicate) == abs(scenario_predicate)
end)
end)
end
defp group_in_scenario?(scenario, group) do
Ash.SatSolver.Utils.is_ordered_sublist_of?(group, scenario)
end
defp add_group_binding(bindings, group) do
if bindings[:groups][group] do
bindings
else
binding = bindings[:current]
bindings
|> Map.put_new(:reverse_groups, %{})
|> Map.update!(:reverse_groups, &Map.put(&1, binding, group))
|> Map.put_new(:groups, %{})
|> Map.update!(:groups, &Map.put(&1, group, binding))
|> Map.put(:current, binding + 1)
end
end
cond do
Application.compile_env(:ash, :sat_testing) ->
def solve_expression(cnf) do
Module.concat([System.get_env("SAT_SOLVER") || "Picosat"]).solve(cnf)
end
Code.ensure_loaded?(Picosat) ->
def solve_expression(cnf) do
Picosat.solve(cnf)
end
Code.ensure_loaded?(SimpleSat) ->
def solve_expression(cnf) do
SimpleSat.solve(cnf)
end
true ->
def solve_expression(_cnf) do
if Code.ensure_loaded?(Picosat) || Code.ensure_loaded?(SimpleSat) do
raise """
No SAT solver available, although one was loaded. This typically means you need to run `mix deps.compile ash --force`
If that doesn't work, please ensure that one of the following dependencies is present in your application to use sat solver features:
* `:picosat_elixir` (recommended) - A NIF wrapper around the PicoSAT SAT solver. Fast, production ready, battle tested.
* `:simple_sat` - A pure Elixir SAT solver. Slower than PicoSAT, but no NIF dependency.
"""
else
raise """
No SAT solver available.
Please add one of the following dependencies to your application to use sat solver features:
* `:picosat_elixir` (recommended) - A NIF wrapper around the PicoSAT SAT solver. Fast, production ready, battle tested.
* `:simple_sat` - A pure Elixir SAT solver. Slower than PicoSAT, but no NIF dependency.
"""
end
end
end
def contains?([], _), do: false
def contains?([_ | t] = l1, l2) do
List.starts_with?(l1, l2) or contains?(t, l2)
end
def solutions_to_predicate_values(solution, bindings) do
Enum.reduce(solution, %{true: [], false: []}, fn var, state ->
fact = Map.get(bindings, abs(var))
if is_nil(fact) do
raise Ash.Error.Framework.AssumptionFailed.exception(
message: """
A fact from the sat solver had no corresponding bound fact:
Bindings:
#{inspect(bindings)}
Missing:
#{inspect(var)}
"""
)
end
Map.put(state, fact, var > 0)
end)
end
defp extract_bindings(expr, bindings \\ %{current: 1})
defp extract_bindings({operator, left, right}, bindings) do
{bindings, left_extracted} = extract_bindings(left, bindings)
{bindings, right_extracted} = extract_bindings(right, bindings)
{bindings, {operator, left_extracted, right_extracted}}
end
defp extract_bindings({:not, value}, bindings) do
{bindings, extracted} = extract_bindings(value, bindings)
{bindings, b(not extracted)}
end
defp extract_bindings(value, %{current: current} = bindings) do
current_binding =
Enum.find(bindings, fn {key, binding_value} ->
key != :current && binding_value == value
end)
case current_binding do
nil ->
new_bindings =
bindings
|> Map.put(:current, current + 1)
|> Map.put(current, value)
{new_bindings, current}
{binding, _} ->
{bindings, binding}
end
end
# A helper function for formatting to the same output we'd give to picosat
@doc false
def to_picosat(clauses, variable_count) do
clause_count = Enum.count(clauses)
formatted_input =
Enum.map_join(clauses, "\n", fn clause ->
format_clause(clause) <> " 0"
end)
"p cnf #{variable_count} #{clause_count}\n" <> formatted_input
end
defp negations_to_negative_numbers(clauses) do
Enum.map(
clauses,
fn
{:not, var} when is_integer(var) ->
[negate_var(var)]
var when is_integer(var) ->
[var]
clause ->
Enum.map(clause, fn
{:not, var} -> negate_var(var)
var -> var
end)
end
)
end
defp negate_var(var, multiplier \\ -1)
defp negate_var({:not, value}, multiplier) do
negate_var(value, multiplier * -1)
end
defp negate_var(value, multiplier), do: value * multiplier
defp format_clause(clause) do
Enum.map_join(clause, " ", fn
{:not, var} -> "-#{var}"
var -> "#{var}"
end)
end
defp lift_clauses({:and, left, right}) do
lift_clauses(left) ++ lift_clauses(right)
end
defp lift_clauses({:or, left, right}) do
[lift_or_clauses(left) ++ lift_or_clauses(right)]
end
defp lift_clauses(value), do: [[value]]
defp lift_or_clauses({:or, left, right}) do
lift_or_clauses(left) ++ lift_or_clauses(right)
end
defp lift_or_clauses(value), do: [value]
defp to_conjunctive_normal_form(expression) do
expression
|> demorgans_law()
|> distributive_law()
end
defp distributive_law(expression) do
distributive_law_applied = apply_distributive_law(expression)
if expression == distributive_law_applied do
expression
else
distributive_law(distributive_law_applied)
end
end
defp apply_distributive_law({:or, left, {:and, right1, right2}}) do
left_distributed = apply_distributive_law(left)
{:and, {:or, left_distributed, apply_distributive_law(right1)},
{:or, left_distributed, apply_distributive_law(right2)}}
end
defp apply_distributive_law({:or, {:and, left1, left2}, right}) do
right_distributed = apply_distributive_law(right)
{:and, {:or, apply_distributive_law(left1), right_distributed},
{:or, apply_distributive_law(left2), right_distributed}}
end
defp apply_distributive_law({:not, expression}) do
{:not, apply_distributive_law(expression)}
end
defp apply_distributive_law({operator, left, right}) when operator in [:and, :or] do
{operator, apply_distributive_law(left), apply_distributive_law(right)}
end
defp apply_distributive_law(var) when is_integer(var) do
var
end
defp demorgans_law(expression) do
demorgans_law_applied = apply_demorgans_law(expression)
if expression == demorgans_law_applied do
expression
else
demorgans_law(demorgans_law_applied)
end
end
defp apply_demorgans_law({:not, {:and, left, right}}) do
{:or, {:not, apply_demorgans_law(left)}, {:not, apply_demorgans_law(right)}}
end
defp apply_demorgans_law({:not, {:or, left, right}}) do
{:and, {:not, left}, {:not, right}}
end
defp apply_demorgans_law({operator, left, right}) when operator in [:or, :and] do
{operator, apply_demorgans_law(left), apply_demorgans_law(right)}
end
defp apply_demorgans_law({:not, expression}) do
{:not, apply_demorgans_law(expression)}
end
defp apply_demorgans_law(var) when is_integer(var) do
var
end
end