diff --git a/theories/examples/map_reduce.v b/theories/examples/map_reduce.v index cac7f2b1d19bd402eecb59dd15b4fd6e7378d9ec..4a015a22ce6485dd87aeafe94adbd0eec856d449 100644 --- a/theories/examples/map_reduce.v +++ b/theories/examples/map_reduce.v @@ -29,6 +29,66 @@ Instance: Params (@group_insert) 5. Instance: Params (@group) 3. Instance: Params (@group) 7. + +(** Distributed version *) +Definition par_map_reduce_map_server : val := + rec: "go" "n" "cmap" "csort" "xs" := + if: "n" = #0 then #() else + if: recv "cmap" then (* send item to mapper *) + if: lisnil "xs" then + send "cmap" #false;; + "go" ("n" - #1) "cmap" "csort" "xs" + else + send "cmap" #true;; + send "cmap" (lhead "xs");; + "go" "n" "cmap" "csort" (ltail "xs") + else (* receive item from mapper *) + let: "zs" := recv "cmap" in + send_all "csort" "zs";; + "go" "n" "cmap" "csort" "xs". + +Definition par_map_reduce_collect : val := + rec: "go" "csort" "i" "ys" := + if: ~recv "csort" then (("i", "ys"), NONE) else + let: "jy" := recv "csort" in + let: "j" := Fst "jy" in let: "y" := Snd "jy" in + if: "i" = "j" then "go" "csort" "j" (lcons "y" "ys") else + (("i", "ys"), SOME ("j", "y")). + +Definition par_map_reduce_reduce_server : val := + rec: "go" "n" "csort" "cred" "acc" "zs" := + if: "n" = #0 then "zs" else + if: recv "cred" then (* Send item to mapper *) + match: "acc" with + NONE => + (* nothing left *) + send "cred" #false;; "go" ("n" - #1) "csort" "cred" NONE "zs" + | SOME "acc" => + (* Read subsequent items with the same key *) + let: "grp" := par_map_reduce_collect "csort" + (Fst "acc") (lcons (Snd "acc") (lnil #())) in + send "cred" #true;; + send "cred" (Fst "grp");; + "go" "n" "csort" "cred" (Snd "grp") "zs" + end + else (* receive item from mapper *) + let: "zs'" := recv "cred" in + "go" "n" "csort" "cred" "acc" (lapp "zs'" "zs"). + +Definition cmpZfst : val := λ: "x" "y", Fst "x" ≤ Fst "y". + +Definition par_map_reduce : val := λ: "n" "map" "red" "xs", + let: "cmap" := start_map_service "n" "map" in + let: "csort" := start_chan (λ: "c", sort_elem_service cmpZfst "c") in + par_map_reduce_map_server "n" "cmap" "csort" "xs";; + send "csort" #stop;; + let: "cred" := start_map_service "n" "red" in + (* We need the first sorted element in the loop to compare subsequent elements *) + if: ~recv "csort" then lnil #() else (* Handle the empty case *) + let: "jy" := recv "csort" in + par_map_reduce_reduce_server "n" "csort" "cred" (SOME "jy") (lnil #()). + + (** Properties about the functional version *) Local Infix "≡ₚₚ" := (PermutationA (prod_relation (=) (≡ₚ))) (at level 70, no associativity) : stdpp_scope. @@ -141,63 +201,6 @@ Section map_reduce. Proof. intros xs1 xs2 Hxs. by rewrite /map_reduce /= Hxs. Qed. End map_reduce. -(** Distributed version *) -Definition par_map_reduce_map_server : val := - rec: "go" "n" "cmap" "csort" "xs" := - if: "n" = #0 then #() else - if: recv "cmap" then (* send item to mapper *) - if: lisnil "xs" then - send "cmap" #false;; - "go" ("n" - #1) "cmap" "csort" "xs" - else - send "cmap" #true;; - send "cmap" (lhead "xs");; - "go" "n" "cmap" "csort" (ltail "xs") - else (* receive item from mapper *) - let: "zs" := recv "cmap" in - send_all "csort" "zs";; - "go" "n" "cmap" "csort" "xs". - -Definition par_map_reduce_collect : val := - rec: "go" "csort" "i" "ys" := - if: ~recv "csort" then (("i", "ys"), NONE) else - let: "jy" := recv "csort" in - let: "j" := Fst "jy" in let: "y" := Snd "jy" in - if: "i" = "j" then "go" "csort" "j" (lcons "y" "ys") else - (("i", "ys"), SOME ("j", "y")). - -Definition par_map_reduce_reduce_server : val := - rec: "go" "n" "csort" "cred" "acc" "zs" := - if: "n" = #0 then "zs" else - if: recv "cred" then (* Send item to mapper *) - match: "acc" with - NONE => - (* nothing left *) - send "cred" #false;; "go" ("n" - #1) "csort" "cred" NONE "zs" - | SOME "acc" => - (* Read subsequent items with the same key *) - let: "grp" := par_map_reduce_collect "csort" - (Fst "acc") (lcons (Snd "acc") (lnil #())) in - send "cred" #true;; - send "cred" (Fst "grp");; - "go" "n" "csort" "cred" (Snd "grp") "zs" - end - else (* receive item from mapper *) - let: "zs'" := recv "cred" in - "go" "n" "csort" "cred" "acc" (lapp "zs'" "zs"). - -Definition cmpZfst : val := λ: "x" "y", Fst "x" ≤ Fst "y". - -Definition par_map_reduce : val := λ: "n" "map" "red" "xs", - let: "cmap" := start_map_service "n" "map" in - let: "csort" := start_chan (λ: "c", sort_elem_service cmpZfst "c") in - par_map_reduce_map_server "n" "cmap" "csort" "xs";; - send "csort" #stop;; - let: "cred" := start_map_service "n" "red" in - (* We need the first sorted element in the loop to compare subsequent elements *) - if: ~recv "csort" then lnil #() else (* Handle the empty case *) - let: "jy" := recv "csort" in - par_map_reduce_reduce_server "n" "csort" "cred" (SOME "jy") (lnil #()). (** Correctness proofs of the distributed version *) Class map_reduceG Σ A B `{Countable A, Countable B} := {