From 4e50de3d7efc96260c0a724ad75b9257d2ddebe5 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Mon, 6 May 2024 13:03:52 -0400 Subject: [PATCH] improvement: performance improvements --- lib/ash_csv/data_layer.ex | 153 ++++++++++++++++++++++---------------- mix.lock | 6 +- 2 files changed, 92 insertions(+), 67 deletions(-) diff --git a/lib/ash_csv/data_layer.ex b/lib/ash_csv/data_layer.ex index 67f99b8..86aad41 100644 --- a/lib/ash_csv/data_layer.ex +++ b/lib/ash_csv/data_layer.ex @@ -3,6 +3,8 @@ defmodule AshCsv.DataLayer do alias Ash.Actions.Sort + @filter_stream_size 100 + @impl true def can?(_, :read), do: true def can?(_, :create), do: true @@ -126,23 +128,7 @@ defmodule AshCsv.DataLayer do @impl true def run_query(query, resource) do - case read_file(resource) do - {:ok, results} -> - offset_records = - results - |> filter_matches(query.filter, query.domain) - |> Sort.runtime_sort(query.sort, domain: query.domain) - |> Enum.drop(query.offset || 0) - - if query.limit do - {:ok, Enum.take(offset_records, query.limit)} - else - {:ok, offset_records} - end - - {:error, error} -> - {:error, error} - end + read_file(resource, true, query.domain, query.filter, query.sort, query.offset, query.limit) rescue e in File.Error -> if create?(resource) do @@ -209,14 +195,14 @@ defmodule AshCsv.DataLayer do @impl true def update(resource, changeset) do resource - |> do_read_file() + |> read_file(false, changeset.domain) |> do_update(resource, changeset) end @impl true - def destroy(resource, %{data: record}) do + def destroy(resource, %{data: record, domain: domain}) do resource - |> do_read_file() + |> read_file(false, domain) |> do_destroy(resource, record) end @@ -450,39 +436,17 @@ defmodule AshCsv.DataLayer do end end - defp read_file(resource) do - columns = columns(resource) - - resource - |> do_read_file() - |> case do - {:ok, results} -> - do_cast_stored(results, columns, resource) - - {:error, error} -> - {:error, error} - end - end - - defp do_cast_stored(results, columns, resource) do - results - |> Enum.reduce_while({:ok, []}, fn result, {:ok, results} -> - key_vals = - columns - |> Enum.zip(result) - |> Enum.reject(fn {key, _value} -> - key == :_ - end) - - case cast_stored(resource, key_vals) do - {:ok, casted} -> {:cont, {:ok, [casted | results]}} - {:error, error} -> {:halt, {:error, error}} - end - end) - end - # sobelow_skip ["Traversal.FileModule"] - defp do_read_file(resource, retry? \\ false) do + defp read_file( + resource, + decode?, + domain, + filter \\ nil, + sort \\ nil, + offset \\ nil, + limit \\ nil, + retry? \\ false + ) do amount_to_drop = if header?(resource) do 1 @@ -490,30 +454,91 @@ defmodule AshCsv.DataLayer do 0 end - resource - |> file() - |> File.stream!() - |> Stream.drop(amount_to_drop) - |> CSV.decode(separator: separator(resource)) - |> Enum.reduce_while({:ok, []}, fn - {:ok, result}, {:ok, results} -> - {:cont, {:ok, [result | results]}} + columns = columns(resource) - {:error, error}, _ -> - {:halt, {:error, error}} - end) + results = + resource + |> file() + |> File.stream!() + |> Stream.drop(amount_to_drop) + |> CSV.decode(separator: separator(resource)) + |> then(fn csv_stream -> + if decode? do + csv_stream + |> Stream.map(fn + {:error, error} -> + throw({:error, error}) + + {:ok, row} -> + key_vals = + columns + |> Enum.zip(row) + |> Enum.reject(fn {key, _value} -> + key == :_ + end) + + case cast_stored(resource, key_vals) do + {:ok, casted} -> casted + {:error, error} -> throw({:error, error}) + end + end) + |> filter_stream(domain, filter) + |> sort_stream(resource, domain, sort) + |> offset_stream(offset) + |> limit_stream(limit) + |> Enum.to_list() + else + csv_stream + |> Stream.map(fn + {:error, error} -> + throw({:error, error}) + + {:ok, row} -> + row + end) + end + end) + + {:ok, results} rescue e in File.Error -> if e.reason == :enoent && !retry? do file = file(resource) File.mkdir_p!(Path.dirname(file)) File.write!(file(resource), header(resource)) - do_read_file(resource, true) + read_file(resource, decode?, domain, filter, sort, offset, limit, true) else reraise e, __STACKTRACE__ end + catch + {:error, error} -> + {:error, error} end + defp sort_stream(stream, _resource, _domain, sort) when sort in [nil, []] do + stream + end + + defp sort_stream(stream, resource, domain, sort) do + Sort.runtime_sort(stream, sort, domain: domain, resource: resource) + end + + defp filter_stream(stream, _domain, nil), do: stream + + defp filter_stream(stream, domain, filter) do + stream + |> Stream.chunk_every(@filter_stream_size) + |> Stream.flat_map(fn chunk -> + filter_matches(chunk, filter, domain) + end) + end + + defp offset_stream(stream, offset) when offset in [0, nil], do: stream + defp offset_stream(stream, offset), do: Stream.drop(stream, offset) + + defp limit_stream(stream, nil), do: stream + defp limit_stream(stream, limit), do: Stream.take(stream, limit) + # sobelow_skip ["Traversal.FileModule"] defp create_from_records(records, resource, changeset, retry?) do pkey = Ash.Resource.Info.primary_key(resource) diff --git a/mix.lock b/mix.lock index d99f214..15d12b1 100644 --- a/mix.lock +++ b/mix.lock @@ -25,9 +25,9 @@ "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "reactor": {:hex, :reactor, "0.8.1", "1aec71d16083901277727c8162f6dd0f07e80f5ca98911b6ef4f2c95e6e62758", [:mix], [{:libgraph, "~> 0.16", [hex: :libgraph, repo: "hexpm", optional: false]}, {:spark, "~> 2.0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ae3936d97a3e4a316744f70c77b85345b08b70da334024c26e6b5eb8ede1246b"}, "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, - "sourceror": {:hex, :sourceror, "1.0.2", "c5e86fdc14881f797749d1fe5df017ca66727a8146e7ee3e736605a3df78f3e6", [:mix], [], "hexpm", "832335e87d0913658f129d58b2a7dc0490ddd4487b02de6d85bca0169ec2bd79"}, - "spark": {:hex, :spark, "2.1.11", "8093149dfd583b5ce2c06e1fea1faaf4125b50e4703138b2cbefb78c8f4aa07f", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "1877d92ab993b860e9d828bfd72d50367c0d3a53dd84f4de5d221baf66ae8723"}, - "splode": {:hex, :splode, "0.2.1", "020079ec06c9e00f8b6586852e781b5e07aee6ba588f3f45dd993831c87b0511", [:mix], [], "hexpm", "d232a933666061fe1f659d9906042fa94b9b393bb1129a4fde6fa680033b2611"}, + "sourceror": {:hex, :sourceror, "1.0.3", "111711c147f4f1414c07a67b45ad0064a7a41569037355407eda635649507f1d", [:mix], [], "hexpm", "56c21ef146c00b51bc3bb78d1f047cb732d193256a7c4ba91eaf828d3ae826af"}, + "spark": {:hex, :spark, "2.1.20", "204db8fd28378783c28a9dcb0bebdaf1d51b14a9ea106e1080457d29510a66ea", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "e7a4f8f8ca7a477918af1eb65e20f2015f783a9a23e5f73d1020edf5b2ef69be"}, + "splode": {:hex, :splode, "0.2.4", "71046334c39605095ca4bed5d008372e56454060997da14f9868534c17b84b53", [:mix], [], "hexpm", "ca3b95f0d8d4b482b5357954fec857abd0fa3ea509d623334c1328e7382044c2"}, "stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "typable": {:hex, :typable, "0.3.0", "0431e121d124cd26f312123e313d2689b9a5322b15add65d424c07779eaa3ca1", [:mix], [], "hexpm", "880a0797752da1a4c508ac48f94711e04c86156f498065a83d160eef945858f8"},