wip: stream spike.

This commit is contained in:
James Harton 2023-07-11 16:44:25 +12:00
parent 54bbbf7274
commit c339eec0af
Signed by: james
GPG key ID: 90E82DAA13F624F4
9 changed files with 263 additions and 2 deletions

View file

@ -17,6 +17,11 @@ spark_locals_without_parens = [
debug: 2,
default: 0,
default: 1,
filter: 0,
filter: 1,
generator: 0,
generator: 1,
generator: 2,
group: 1,
group: 2,
input: 1,
@ -26,11 +31,16 @@ spark_locals_without_parens = [
matches?: 2,
max_retries: 1,
on: 1,
predicate: 1,
reject: 0,
reject: 1,
return: 1,
run: 1,
step: 1,
step: 2,
step: 3,
stream: 1,
stream: 2,
switch: 1,
switch: 2,
transform: 1,

View file

@ -483,6 +483,116 @@ defmodule Reactor.Dsl do
]
}
@argument_with_element @argument
|> Map.from_struct()
|> update_in(
[:examples],
&["argument :element, element(:stream_name)" | &1]
)
|> update_in([:schema, :source, :type], fn {:or, types} ->
{:or, [{:struct, Template.Element} | types]}
end)
|> update_in([:schema, :source, :doc], fn doc ->
String.replace(
doc,
"Reactor.Dsl.Argument",
"Reactor.Dsl.Stream.Argument"
)
end)
|> update_in([:imports], &[Dsl.Stream | &1])
|> then(&struct(Entity, &1))
@stream_filter %Entity{
name: :filter,
describe: """
Filter stream elements using a predicate.
""",
target: Dsl.Stream.Filter,
entities: [arguments: [@argument_with_element]],
schema: [
predicate: [
type: {:or, [{:mfa_or_fun, 1}, {:mfa_or_fun, 2}]},
required: false,
doc: """
The predicate to use to filter the stream.
"""
]
]
}
@stream_reject %Entity{
name: :reject,
describe: """
Reject stream elements using a predicate.
""",
target: Dsl.Stream.Reject,
entities: [arguments: [@argument_with_element]],
schema: [
predicate: [
type: {:or, [{:mfa_or_fun, 1}, {:mfa_or_fun, 2}]},
required: false,
doc: """
The predicate to use to reject elements from the stream.
"""
]
]
}
@stream_generator %Entity{
name: :generator,
describe: """
Generates a stream of values.
""",
target: Dsl.Stream.Generator,
args: [{:optional, :source}],
entities: [arguments: [@argument]],
schema: [
source: [
type:
{:or,
[
{:struct, Template.Input},
{:struct, Template.Result},
{:struct, Template.Value}
]},
required: false
],
run: [
type: {:or, [{:mfa_or_fun, 1}, {:mfa_or_fun, 2}]},
required: false,
doc: """
A function that can transform arguments into an enumerable.
Required if no `source` template is provided.
"""
]
]
}
@stream %Entity{
name: :stream,
describe: """
Lazily transforms a stream of values similar to the elixir `Stream` module.
""",
target: Dsl.Stream,
args: [:name],
identifier: :name,
entities: [
generator: [@stream_generator],
stages: [@stream_filter, @stream_reject]
],
recursive_as: :steps,
schema: [
name: [
type: :atom,
required: true,
doc: """
A unique identifier for the stream.
"""
]
]
}
@reactor %Section{
name: :reactor,
describe: "The top-level reactor DSL",
@ -495,7 +605,7 @@ defmodule Reactor.Dsl do
"""
]
],
entities: [@around, @debug, @group, @input, @step, @switch, @compose],
entities: [@around, @debug, @group, @input, @step, @stream, @switch, @compose],
top_level?: true
}

View file

@ -14,7 +14,8 @@ defmodule Reactor.Dsl.Argument do
@type t :: %Dsl.Argument{
name: atom,
source: Template.Input.t() | Template.Result.t() | Template.Value.t(),
source:
Template.Element.t() | Template.Input.t() | Template.Result.t() | Template.Value.t(),
transform: nil | (any -> any) | {module, keyword} | mfa,
__identifier__: any
}

29
lib/reactor/dsl/stream.ex Normal file
View file

@ -0,0 +1,29 @@
defmodule Reactor.Dsl.Stream do
@moduledoc """
The struct used to store the `stream` DSL entity.
See `d:Reactor.stream`.
"""
alias Reactor.{Dsl.Stream, Template.Element}
defstruct __identifier__: nil,
generator: nil,
name: nil,
stages: []
@type t :: %Stream{
__identifier__: any,
generator: Stream.Generator.t(),
name: atom,
stages: [stage]
}
@type stage :: Stream.Filter.t() | Stream.Reject.t()
@doc """
The `element` template helpers for the Reactor stream DSL.
"""
@spec element(atom) :: Element.t()
def element(stream_name), do: %Element{name: stream_name}
end

View file

@ -0,0 +1,20 @@
defmodule Reactor.Dsl.Stream.Filter do
@moduledoc """
The struct used to store the `filter` DSL entity.
See `d:Reactor.stream.filter`.
"""
alias Reactor.Dsl.Argument
defstruct __identifier__: nil,
arguments: [],
predicate: nil
@type t :: %__MODULE__{
__identifier__: any,
arguments: [Argument.t()],
predicate:
mfa | (Reactor.inputs() -> any) | (Reactor.inputs(), Reactor.context() -> any)
}
end

View file

@ -0,0 +1,22 @@
defmodule Reactor.Dsl.Stream.Generator do
@moduledoc """
The struct used to store the `generator` DSL entity.
See `d:Reactor.stream.generator`.
"""
alias Reactor.{Dsl.Argument, Template}
defstruct __identifier__: nil,
arguments: [],
run: nil,
source: nil
@type t :: %__MODULE__{
__identifier__: any,
arguments: [Argument.t()],
source: nil | Template.Input.t() | Template.Result.t() | Template.Value.t(),
run:
nil | mfa | (Reactor.inputs() -> any) | (Reactor.inputs(), Reactor.context() -> any)
}
end

View file

@ -0,0 +1,20 @@
defmodule Reactor.Dsl.Stream.Reject do
@moduledoc """
The struct used to store the `reject` DSL entity.
See `d:Reactor.stream.reject`.
"""
alias Reactor.Dsl.Argument
defstruct __identifier__: nil,
arguments: [],
predicate: nil
@type t :: %__MODULE__{
__identifier__: any,
arguments: [Argument.t()],
predicate:
mfa | (Reactor.inputs() -> any) | (Reactor.inputs(), Reactor.context() -> any)
}
end

View file

@ -0,0 +1,9 @@
defmodule Reactor.Template.Element do
@moduledoc """
The `element` template.
"""
defstruct name: nil
@type t :: %__MODULE__{name: atom}
end

View file

@ -0,0 +1,40 @@
defmodule Reactor.Dsl.StreamTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule StreamReactor do
@moduledoc false
use Reactor
input :low
input :high
stream :only_primes do
generator do
argument :low, input(:low)
argument :high, input(:high)
run fn %{low: low, high: high} ->
Range.new(low, high)
end
end
filter do
argument :element, element(:only_primes)
predicate(fn
%{element: 1} ->
true
%{element: 2} ->
false
%{element: element} ->
2
|> round(element / 2)
|> Enum.any?(&rem(element, &1))
end)
end
end
end
end