Files
2024-11-29 19:15:35 +01:00

90 lines
2.4 KiB
Elixir

defmodule Mobilizon.Storage.Page do
@moduledoc """
Module for pagination of queries.
"""
import Ecto.Query
alias Mobilizon.Storage.Repo
defstruct [
:total,
:elements
]
@type t(structure) :: %__MODULE__{
total: integer,
elements: list(structure)
}
@doc """
Returns a Page struct for a query.
"""
@spec build_page(Ecto.Queryable.t(), integer | nil, integer) :: t(any)
def build_page(query, page, limit) do
count_query =
query
# Exclude select because we add a new one below
|> exclude(:select)
# Exclude order_by for perf
|> exclude(:order_by)
# Exclude preloads to avoid error "cannot preload associations in subquery"
|> exclude(:preload)
|> subquery()
|> select([r], count(fragment("*")))
[total, elements] =
[
fn -> Repo.one(count_query) end,
fn -> Repo.all(paginate(query, page, limit)) end
]
|> Enum.map(&Task.async/1)
|> Enum.map(&Task.await(&1, 30_000))
%__MODULE__{total: total, elements: elements}
end
@doc """
Add limit and offset to the query.
"""
@spec paginate(Ecto.Queryable.t() | struct, integer | nil, integer | nil) :: Ecto.Query.t()
def paginate(query, page \\ 1, size \\ 10)
def paginate(query, page, _size) when is_nil(page), do: paginate(query)
def paginate(query, page, size) when is_nil(size), do: paginate(query, page)
def paginate(query, page, size) do
from(query, limit: ^size, offset: ^((page - 1) * size))
end
@doc """
Stream chunks of results from the given queryable.
Unlike Repo.stream, this function does not keep a long running transaction open.
Hence, consistency is not guarenteed in the presence of rows being deleted or sort criteria changing.
## Example
Ecto.Query.from(u in Users, order_by: [asc: :created_at])
|> Repo.chunk(100)
|> Stream.map(&process_batch_of_users)
|> Stream.run()
## Source
https://elixirforum.com/t/what-is-the-best-approach-for-fetching-large-amount-of-records-from-postgresql-with-ecto/3766/8
"""
@spec chunk(Ecto.Queryable.t(), integer) :: Stream.t()
def chunk(queryable, chunk_size) do
chunk_stream =
Stream.unfold(1, fn page_number ->
page = queryable |> paginate(page_number, chunk_size) |> Repo.all()
{page, page_number + 1}
end)
Stream.take_while(chunk_stream, fn
[] -> false
_ -> true
end)
end
end