using Distributed

# First, initialize remote nodes with all the libraries we'll be using. For
# macros to expand correctly, this code block must be separate from the bulk of
# the code further down.
@everywhere begin
using CSV
using ZipFile
using Dates
using DataFrames
using DataSets
using Underscores
end

# Next, define utility functions we'll need on the nodes.
@everywhere begin

function load_truefx_csv(csv_blob)
    @info "Loading $(basename(csv_blob))"
    open(IO, csv_blob) do io
        zipped_content = only(ZipFile.Reader(io).files)
        buf = read(zipped_content)
        CSV.read(buf, DataFrame,
                 header=["pair", "timestamp", "bid", "ask"],
                 dateformat=dateformat"yyyymmdd H:M:S.s",
                 types=Dict(:timestamp=>DateTime))
    end
end

# Compute open-high-low-close of bid price by partitioning via `grouping` which
# defaults to days.
function compute_ohlc(data, bid_or_ask=:bid, grouping=t->floor.(t, Dates.Day(1)))
    @_ data |>
       transform(__, :timestamp=>grouping=>:timestamp) |>
       groupby(__, [:pair, :timestamp]) |>
       combine(__, bid_or_ask=>minimum=>:low, bid_or_ask=>maximum=>:high,
                   bid_or_ask=>first=>:open,  bid_or_ask=>last=>:close)
end

# Process a subset of the files in the dataset on each Distributed worker.
function process_data_subset(dataset_name, subset_range)
    open(BlobTree, dataset(dataset_name)) do data_tree
        blobs = collect(data_tree)
        summaries = compute_ohlc.(load_truefx_csv.(blobs[subset_range]))
        reduce(vcat, summaries)
    end
end

end

#-------------------------------------------------------------------------------
# Script section

if !haskey(ENV, "JRUN_NAME")
    # Load Data.toml if not on JuliaHub. A future version of DataSets.jl will
    # make this step unnecessary.
    @everywhere DataSets.load_project!(path"../Data.toml")
end

# Distributed version
dataset_name = "currency_data"
num_files = open(length∘collect, BlobTree, dataset(dataset_name))
work_partition = Iterators.partition(1:num_files, num_files÷nworkers())
ohlc_data = reduce(vcat, pmap(r->process_data_subset(dataset_name, r), work_partition))

CSV.write("ohlc_data.csv", ohlc_data)
ENV["RESULTS_FILE"] = "ohlc_data.csv"

