mirror of
https://github.com/ash-project/ash_csv.git
synced 2024-09-19 12:53:31 +12:00
improvement: performance improvements
This commit is contained in:
parent
1331ca35ac
commit
4e50de3d7e
2 changed files with 92 additions and 67 deletions
|
@ -3,6 +3,8 @@ defmodule AshCsv.DataLayer do
|
|||
|
||||
alias Ash.Actions.Sort
|
||||
|
||||
@filter_stream_size 100
|
||||
|
||||
@impl true
|
||||
def can?(_, :read), do: true
|
||||
def can?(_, :create), do: true
|
||||
|
@ -126,23 +128,7 @@ defmodule AshCsv.DataLayer do
|
|||
|
||||
@impl true
|
||||
def run_query(query, resource) do
|
||||
case read_file(resource) do
|
||||
{:ok, results} ->
|
||||
offset_records =
|
||||
results
|
||||
|> filter_matches(query.filter, query.domain)
|
||||
|> Sort.runtime_sort(query.sort, domain: query.domain)
|
||||
|> Enum.drop(query.offset || 0)
|
||||
|
||||
if query.limit do
|
||||
{:ok, Enum.take(offset_records, query.limit)}
|
||||
else
|
||||
{:ok, offset_records}
|
||||
end
|
||||
|
||||
{:error, error} ->
|
||||
{:error, error}
|
||||
end
|
||||
read_file(resource, true, query.domain, query.filter, query.sort, query.offset, query.limit)
|
||||
rescue
|
||||
e in File.Error ->
|
||||
if create?(resource) do
|
||||
|
@ -209,14 +195,14 @@ defmodule AshCsv.DataLayer do
|
|||
@impl true
|
||||
def update(resource, changeset) do
|
||||
resource
|
||||
|> do_read_file()
|
||||
|> read_file(false, changeset.domain)
|
||||
|> do_update(resource, changeset)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def destroy(resource, %{data: record}) do
|
||||
def destroy(resource, %{data: record, domain: domain}) do
|
||||
resource
|
||||
|> do_read_file()
|
||||
|> read_file(false, domain)
|
||||
|> do_destroy(resource, record)
|
||||
end
|
||||
|
||||
|
@ -450,39 +436,17 @@ defmodule AshCsv.DataLayer do
|
|||
end
|
||||
end
|
||||
|
||||
defp read_file(resource) do
|
||||
columns = columns(resource)
|
||||
|
||||
resource
|
||||
|> do_read_file()
|
||||
|> case do
|
||||
{:ok, results} ->
|
||||
do_cast_stored(results, columns, resource)
|
||||
|
||||
{:error, error} ->
|
||||
{:error, error}
|
||||
end
|
||||
end
|
||||
|
||||
defp do_cast_stored(results, columns, resource) do
|
||||
results
|
||||
|> Enum.reduce_while({:ok, []}, fn result, {:ok, results} ->
|
||||
key_vals =
|
||||
columns
|
||||
|> Enum.zip(result)
|
||||
|> Enum.reject(fn {key, _value} ->
|
||||
key == :_
|
||||
end)
|
||||
|
||||
case cast_stored(resource, key_vals) do
|
||||
{:ok, casted} -> {:cont, {:ok, [casted | results]}}
|
||||
{:error, error} -> {:halt, {:error, error}}
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
# sobelow_skip ["Traversal.FileModule"]
|
||||
defp do_read_file(resource, retry? \\ false) do
|
||||
defp read_file(
|
||||
resource,
|
||||
decode?,
|
||||
domain,
|
||||
filter \\ nil,
|
||||
sort \\ nil,
|
||||
offset \\ nil,
|
||||
limit \\ nil,
|
||||
retry? \\ false
|
||||
) do
|
||||
amount_to_drop =
|
||||
if header?(resource) do
|
||||
1
|
||||
|
@ -490,30 +454,91 @@ defmodule AshCsv.DataLayer do
|
|||
0
|
||||
end
|
||||
|
||||
resource
|
||||
|> file()
|
||||
|> File.stream!()
|
||||
|> Stream.drop(amount_to_drop)
|
||||
|> CSV.decode(separator: separator(resource))
|
||||
|> Enum.reduce_while({:ok, []}, fn
|
||||
{:ok, result}, {:ok, results} ->
|
||||
{:cont, {:ok, [result | results]}}
|
||||
columns = columns(resource)
|
||||
|
||||
{:error, error}, _ ->
|
||||
{:halt, {:error, error}}
|
||||
end)
|
||||
results =
|
||||
resource
|
||||
|> file()
|
||||
|> File.stream!()
|
||||
|> Stream.drop(amount_to_drop)
|
||||
|> CSV.decode(separator: separator(resource))
|
||||
|> then(fn csv_stream ->
|
||||
if decode? do
|
||||
csv_stream
|
||||
|> Stream.map(fn
|
||||
{:error, error} ->
|
||||
throw({:error, error})
|
||||
|
||||
{:ok, row} ->
|
||||
key_vals =
|
||||
columns
|
||||
|> Enum.zip(row)
|
||||
|> Enum.reject(fn {key, _value} ->
|
||||
key == :_
|
||||
end)
|
||||
|
||||
case cast_stored(resource, key_vals) do
|
||||
{:ok, casted} -> casted
|
||||
{:error, error} -> throw({:error, error})
|
||||
end
|
||||
end)
|
||||
|> filter_stream(domain, filter)
|
||||
|> sort_stream(resource, domain, sort)
|
||||
|> offset_stream(offset)
|
||||
|> limit_stream(limit)
|
||||
|> Enum.to_list()
|
||||
else
|
||||
csv_stream
|
||||
|> Stream.map(fn
|
||||
{:error, error} ->
|
||||
throw({:error, error})
|
||||
|
||||
{:ok, row} ->
|
||||
row
|
||||
end)
|
||||
end
|
||||
end)
|
||||
|
||||
{:ok, results}
|
||||
rescue
|
||||
e in File.Error ->
|
||||
if e.reason == :enoent && !retry? do
|
||||
file = file(resource)
|
||||
File.mkdir_p!(Path.dirname(file))
|
||||
File.write!(file(resource), header(resource))
|
||||
do_read_file(resource, true)
|
||||
read_file(resource, decode?, domain, filter, sort, offset, limit, true)
|
||||
else
|
||||
reraise e, __STACKTRACE__
|
||||
end
|
||||
catch
|
||||
{:error, error} ->
|
||||
{:error, error}
|
||||
end
|
||||
|
||||
defp sort_stream(stream, _resource, _domain, sort) when sort in [nil, []] do
|
||||
stream
|
||||
end
|
||||
|
||||
defp sort_stream(stream, resource, domain, sort) do
|
||||
Sort.runtime_sort(stream, sort, domain: domain, resource: resource)
|
||||
end
|
||||
|
||||
defp filter_stream(stream, _domain, nil), do: stream
|
||||
|
||||
defp filter_stream(stream, domain, filter) do
|
||||
stream
|
||||
|> Stream.chunk_every(@filter_stream_size)
|
||||
|> Stream.flat_map(fn chunk ->
|
||||
filter_matches(chunk, filter, domain)
|
||||
end)
|
||||
end
|
||||
|
||||
defp offset_stream(stream, offset) when offset in [0, nil], do: stream
|
||||
defp offset_stream(stream, offset), do: Stream.drop(stream, offset)
|
||||
|
||||
defp limit_stream(stream, nil), do: stream
|
||||
defp limit_stream(stream, limit), do: Stream.take(stream, limit)
|
||||
|
||||
# sobelow_skip ["Traversal.FileModule"]
|
||||
defp create_from_records(records, resource, changeset, retry?) do
|
||||
pkey = Ash.Resource.Info.primary_key(resource)
|
||||
|
|
6
mix.lock
6
mix.lock
|
@ -25,9 +25,9 @@
|
|||
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
|
||||
"reactor": {:hex, :reactor, "0.8.1", "1aec71d16083901277727c8162f6dd0f07e80f5ca98911b6ef4f2c95e6e62758", [:mix], [{:libgraph, "~> 0.16", [hex: :libgraph, repo: "hexpm", optional: false]}, {:spark, "~> 2.0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ae3936d97a3e4a316744f70c77b85345b08b70da334024c26e6b5eb8ede1246b"},
|
||||
"sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"},
|
||||
"sourceror": {:hex, :sourceror, "1.0.2", "c5e86fdc14881f797749d1fe5df017ca66727a8146e7ee3e736605a3df78f3e6", [:mix], [], "hexpm", "832335e87d0913658f129d58b2a7dc0490ddd4487b02de6d85bca0169ec2bd79"},
|
||||
"spark": {:hex, :spark, "2.1.11", "8093149dfd583b5ce2c06e1fea1faaf4125b50e4703138b2cbefb78c8f4aa07f", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "1877d92ab993b860e9d828bfd72d50367c0d3a53dd84f4de5d221baf66ae8723"},
|
||||
"splode": {:hex, :splode, "0.2.1", "020079ec06c9e00f8b6586852e781b5e07aee6ba588f3f45dd993831c87b0511", [:mix], [], "hexpm", "d232a933666061fe1f659d9906042fa94b9b393bb1129a4fde6fa680033b2611"},
|
||||
"sourceror": {:hex, :sourceror, "1.0.3", "111711c147f4f1414c07a67b45ad0064a7a41569037355407eda635649507f1d", [:mix], [], "hexpm", "56c21ef146c00b51bc3bb78d1f047cb732d193256a7c4ba91eaf828d3ae826af"},
|
||||
"spark": {:hex, :spark, "2.1.20", "204db8fd28378783c28a9dcb0bebdaf1d51b14a9ea106e1080457d29510a66ea", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "e7a4f8f8ca7a477918af1eb65e20f2015f783a9a23e5f73d1020edf5b2ef69be"},
|
||||
"splode": {:hex, :splode, "0.2.4", "71046334c39605095ca4bed5d008372e56454060997da14f9868534c17b84b53", [:mix], [], "hexpm", "ca3b95f0d8d4b482b5357954fec857abd0fa3ea509d623334c1328e7382044c2"},
|
||||
"stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"},
|
||||
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
|
||||
"typable": {:hex, :typable, "0.3.0", "0431e121d124cd26f312123e313d2689b9a5322b15add65d424c07779eaa3ca1", [:mix], [], "hexpm", "880a0797752da1a4c508ac48f94711e04c86156f498065a83d160eef945858f8"},
|
||||
|
|
Loading…
Reference in a new issue