Commit b3ef376b authored by Robbert Krebbers's avatar Robbert Krebbers
Browse files

Tweak.

parent 5343f4be
......@@ -29,6 +29,66 @@ Instance: Params (@group_insert) 5.
Instance: Params (@group) 3.
Instance: Params (@group) 7.
(** Distributed version *)
Definition par_map_reduce_map_server : val :=
rec: "go" "n" "cmap" "csort" "xs" :=
if: "n" = #0 then #() else
if: recv "cmap" then (* send item to mapper *)
if: lisnil "xs" then
send "cmap" #false;;
"go" ("n" - #1) "cmap" "csort" "xs"
else
send "cmap" #true;;
send "cmap" (lhead "xs");;
"go" "n" "cmap" "csort" (ltail "xs")
else (* receive item from mapper *)
let: "zs" := recv "cmap" in
send_all "csort" "zs";;
"go" "n" "cmap" "csort" "xs".
Definition par_map_reduce_collect : val :=
rec: "go" "csort" "i" "ys" :=
if: ~recv "csort" then (("i", "ys"), NONE) else
let: "jy" := recv "csort" in
let: "j" := Fst "jy" in let: "y" := Snd "jy" in
if: "i" = "j" then "go" "csort" "j" (lcons "y" "ys") else
(("i", "ys"), SOME ("j", "y")).
Definition par_map_reduce_reduce_server : val :=
rec: "go" "n" "csort" "cred" "acc" "zs" :=
if: "n" = #0 then "zs" else
if: recv "cred" then (* Send item to mapper *)
match: "acc" with
NONE =>
(* nothing left *)
send "cred" #false;; "go" ("n" - #1) "csort" "cred" NONE "zs"
| SOME "acc" =>
(* Read subsequent items with the same key *)
let: "grp" := par_map_reduce_collect "csort"
(Fst "acc") (lcons (Snd "acc") (lnil #())) in
send "cred" #true;;
send "cred" (Fst "grp");;
"go" "n" "csort" "cred" (Snd "grp") "zs"
end
else (* receive item from mapper *)
let: "zs'" := recv "cred" in
"go" "n" "csort" "cred" "acc" (lapp "zs'" "zs").
Definition cmpZfst : val := λ: "x" "y", Fst "x" Fst "y".
Definition par_map_reduce : val := λ: "n" "map" "red" "xs",
let: "cmap" := start_map_service "n" "map" in
let: "csort" := start_chan (λ: "c", sort_elem_service cmpZfst "c") in
par_map_reduce_map_server "n" "cmap" "csort" "xs";;
send "csort" #stop;;
let: "cred" := start_map_service "n" "red" in
(* We need the first sorted element in the loop to compare subsequent elements *)
if: ~recv "csort" then lnil #() else (* Handle the empty case *)
let: "jy" := recv "csort" in
par_map_reduce_reduce_server "n" "csort" "cred" (SOME "jy") (lnil #()).
(** Properties about the functional version *)
Local Infix "≡ₚₚ" :=
(PermutationA (prod_relation (=) ())) (at level 70, no associativity) : stdpp_scope.
......@@ -141,63 +201,6 @@ Section map_reduce.
Proof. intros xs1 xs2 Hxs. by rewrite /map_reduce /= Hxs. Qed.
End map_reduce.
(** Distributed version *)
Definition par_map_reduce_map_server : val :=
rec: "go" "n" "cmap" "csort" "xs" :=
if: "n" = #0 then #() else
if: recv "cmap" then (* send item to mapper *)
if: lisnil "xs" then
send "cmap" #false;;
"go" ("n" - #1) "cmap" "csort" "xs"
else
send "cmap" #true;;
send "cmap" (lhead "xs");;
"go" "n" "cmap" "csort" (ltail "xs")
else (* receive item from mapper *)
let: "zs" := recv "cmap" in
send_all "csort" "zs";;
"go" "n" "cmap" "csort" "xs".
Definition par_map_reduce_collect : val :=
rec: "go" "csort" "i" "ys" :=
if: ~recv "csort" then (("i", "ys"), NONE) else
let: "jy" := recv "csort" in
let: "j" := Fst "jy" in let: "y" := Snd "jy" in
if: "i" = "j" then "go" "csort" "j" (lcons "y" "ys") else
(("i", "ys"), SOME ("j", "y")).
Definition par_map_reduce_reduce_server : val :=
rec: "go" "n" "csort" "cred" "acc" "zs" :=
if: "n" = #0 then "zs" else
if: recv "cred" then (* Send item to mapper *)
match: "acc" with
NONE =>
(* nothing left *)
send "cred" #false;; "go" ("n" - #1) "csort" "cred" NONE "zs"
| SOME "acc" =>
(* Read subsequent items with the same key *)
let: "grp" := par_map_reduce_collect "csort"
(Fst "acc") (lcons (Snd "acc") (lnil #())) in
send "cred" #true;;
send "cred" (Fst "grp");;
"go" "n" "csort" "cred" (Snd "grp") "zs"
end
else (* receive item from mapper *)
let: "zs'" := recv "cred" in
"go" "n" "csort" "cred" "acc" (lapp "zs'" "zs").
Definition cmpZfst : val := λ: "x" "y", Fst "x" Fst "y".
Definition par_map_reduce : val := λ: "n" "map" "red" "xs",
let: "cmap" := start_map_service "n" "map" in
let: "csort" := start_chan (λ: "c", sort_elem_service cmpZfst "c") in
par_map_reduce_map_server "n" "cmap" "csort" "xs";;
send "csort" #stop;;
let: "cred" := start_map_service "n" "red" in
(* We need the first sorted element in the loop to compare subsequent elements *)
if: ~recv "csort" then lnil #() else (* Handle the empty case *)
let: "jy" := recv "csort" in
par_map_reduce_reduce_server "n" "csort" "cred" (SOME "jy") (lnil #()).
(** Correctness proofs of the distributed version *)
Class map_reduceG Σ A B `{Countable A, Countable B} := {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment