map_reduce.v 18.3 KB
Newer Older
Robbert Krebbers's avatar
Robbert Krebbers committed
1
From stdpp Require Import sorting.
Robbert Krebbers's avatar
Robbert Krebbers committed
2
From actris.channel Require Import proto_channel proofmode.
Robbert Krebbers's avatar
Robbert Krebbers committed
3
From iris.heap_lang Require Import proofmode notation.
4
From actris.utils Require Import llist compare contribution.
5
From actris.examples Require Import map sort_fg_client.
Robbert Krebbers's avatar
Robbert Krebbers committed
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
From iris.algebra Require Import gmultiset.
From Coq Require Import SetoidPermutation.

(** Functional version of map reduce (aka the specification) *)
Fixpoint group_insert {A} `{EqDecision K} (i : K) (x : A)
    (ixss : list (K * list A)) : list (K * list A) :=
  match ixss with
  | [] => [(i,[x])]
  | (j,xs) :: ixss =>
     if decide (i = j) then (j,x::xs) :: ixss else (j,xs) :: group_insert i x ixss
  end.

Fixpoint group {A} `{EqDecision K} (ixs : list (K * A)) : list (K * list A) :=
  match ixs with
  | [] => []
  | (i,x) :: ixs => group_insert i x (group ixs)
  end.

Definition map_reduce {A B C} `{EqDecision K}
    (map : A  list (K * B)) (red : K  list B  list C) : list A  list C :=
  mbind (curry red)  group  mbind map.

Instance: Params (@group_insert) 5.
Instance: Params (@group) 3.
Instance: Params (@group) 7.

Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
32 33 34 35 36 37 38 39 40 41 42

(** Distributed version *)
Definition par_map_reduce_map_server : val :=
  rec: "go" "n" "cmap" "csort" "xs" :=
    if: "n" = #0 then #() else
    if: recv "cmap" then (* send item to mapper *)
      if: lisnil "xs" then
        send "cmap" #false;;
        "go" ("n" - #1) "cmap" "csort" "xs"
      else
        send "cmap" #true;;
43 44
        send "cmap" (lpop "xs");;
        "go" "n" "cmap" "csort" "xs"
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
45 46 47 48 49 50 51
    else (* receive item from mapper *)
      let: "zs" := recv "cmap" in
      send_all "csort" "zs";;
      "go" "n" "cmap" "csort" "xs".

Definition par_map_reduce_collect : val :=
  rec: "go" "csort" "i" "ys" :=
52
    if: ~recv "csort" then NONE else
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
53 54
    let: "jy" := recv "csort" in
    let: "j" := Fst "jy" in let: "y" := Snd "jy" in
55 56
    if: "i" = "j" then lcons "y" "ys";; "go" "csort" "j" "ys" else
    SOME ("j", "y").
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
57 58 59

Definition par_map_reduce_reduce_server : val :=
  rec: "go" "n" "csort" "cred" "acc" "zs" :=
60
    if: "n" = #0 then #() else
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
61 62 63 64 65 66 67
    if: recv "cred" then (* Send item to mapper *)
      match: "acc" with
        NONE =>
         (* nothing left *)
         send "cred" #false;; "go" ("n" - #1) "csort" "cred" NONE "zs"
      | SOME "acc" =>
         (* Read subsequent items with the same key *)
Robbert Krebbers's avatar
Robbert Krebbers committed
68
         let: "ys" := lnil #() in lcons (Snd "acc") "ys";;
69
         let: "new_acc" := par_map_reduce_collect "csort" (Fst "acc") "ys" in
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
70
         send "cred" #true;;
71 72
         send "cred" (Fst "acc", "ys");;
         "go" "n" "csort" "cred" "new_acc" "zs"
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
73 74 75
      end
    else (* receive item from mapper *)
      let: "zs'" := recv "cred" in
76 77
      lprep "zs" "zs'";;
      "go" "n" "csort" "cred" "acc" "zs".
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
78 79 80 81 82

Definition cmpZfst : val := λ: "x" "y", Fst "x"  Fst "y".

Definition par_map_reduce : val := λ: "n" "map" "red" "xs",
  let: "cmap" := start_map_service "n" "map" in
83
  let: "csort" := start_chan (λ: "c", sort_service_fg cmpZfst "c") in
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
84 85 86 87 88 89
  par_map_reduce_map_server "n" "cmap" "csort" "xs";;
  send "csort" #stop;;
  let: "cred" := start_map_service "n" "red" in
  (* We need the first sorted element in the loop to compare subsequent elements *)
  if: ~recv "csort" then lnil #() else (* Handle the empty case *)
  let: "jy" := recv "csort" in
90 91
  let: "zs" := lnil #() in
  par_map_reduce_reduce_server "n" "csort" "cred" (SOME "jy") "zs";; "zs".
Robbert Krebbers's avatar
Tweak.  
Robbert Krebbers committed
92 93


Robbert Krebbers's avatar
Robbert Krebbers committed
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
(** Properties about the functional version *)
Local Infix "≡ₚₚ" :=
  (PermutationA (prod_relation (=) ())) (at level 70, no associativity) : stdpp_scope.
Notation "(≡ₚₚ)" := (PermutationA (prod_relation (=) ())) (only parsing) : stdpp_scope.

Section group.
  Context {A : Type} `{EqDecision K}.
  Implicit Types i : K.
  Implicit Types xs : list A.
  Implicit Types ixs : list (K * A).
  Implicit Types ixss : list (K * list A).

  Lemma elem_of_group_insert j i x ixss :
    j  (group_insert i x ixss).*1  i = j  j  ixss.*1.
  Proof.
    induction ixss as [|[i' x'] ixss IH];
      repeat (simplify_eq/= || case_decide); set_solver.
  Qed.

  Lemma group_insert_commute i1 i2 x1 x2 ixss :
    group_insert i1 x1 (group_insert i2 x2 ixss) ₚₚ group_insert i2 x2 (group_insert i1 x1 ixss).
  Proof.
    induction ixss as [|[j x] ixss IH]; repeat (simplify_eq/= || case_decide);
      repeat constructor; done.
 Qed.

  Lemma group_insert_nodup i x ixss :
    NoDup ixss.*1  NoDup (group_insert i x ixss).*1.
  Proof.
    pose proof @elem_of_group_insert.
    induction ixss as [|[j xs] ixss IH]; csimpl; inversion_clear 1;
      repeat (simplify_eq/= || case_decide); repeat constructor; set_solver.
  Qed.
  Lemma group_nodup ixs : NoDup (group ixs).*1.
  Proof.
    induction ixs as [|[i x] ixs IH]; csimpl;
      auto using group_insert_nodup, NoDup_nil_2.
  Qed.

  Lemma grouped_permutation_elem_of ixss1 ixss2 i :
    ixss1 ₚₚ ixss2  i  ixss1.*1  i  ixss2.*1.
  Proof.
    induction 1 as [|[i1 xs1] [i2 xs2] ixss1 ixss2 [??]|[i1 xs1] [i2 xs2] ixss|];
      set_solver.
  Qed.

  Lemma grouped_permutation_nodup ixss1 ixss2 :
    ixss1 ₚₚ ixss2  NoDup ixss1.*1  NoDup ixss2.*1.
  Proof.
    pose proof @grouped_permutation_elem_of.
    induction 1 as [|[i1 xs1] [i2 xs2] ixss1 ixss2 [??]|[i1 xs1] [i2 xs2] ixss|];
      csimpl; rewrite ?NoDup_cons; try set_solver.
  Qed.

  Lemma group_insert_perm ixss1 ixss2 i x :
    NoDup ixss1.*1 
    ixss1 ₚₚ ixss2  group_insert i x ixss1 ₚₚ group_insert i x ixss2.
  Proof.
    induction 2 as [|[i1 xs1] [i2 xs2] ixss1 ixss2 [??]|[i1 xs1] [i2 xs2] ixss|];
      repeat match goal with
      | _ => progress (simplify_eq/= || case_decide)
      | H : NoDup (_ :: _) |- _ => inversion_clear H
      end; first [repeat constructor; by auto
                 |set_solver
                 |etrans; eauto using grouped_permutation_nodup].
  Qed.

  Global Instance group_perm : Proper (() ==> (ₚₚ)) (@group A K _).
  Proof.
    induction 1; repeat (simplify_eq/= || case_decide || case_match);
      first [by etrans|auto using group_insert_perm, group_nodup, group_insert_commute].
  Qed.

  Lemma group_fmap (i : K) xs : xs  []  group ((i,) <$> xs) ₚₚ [(i, xs)].
  Proof.
    induction xs as [|x1 [|x2 xs] IH]; intros; simplify_eq/=; try done.
    etrans.
    { apply group_insert_perm, IH; auto using group_insert_nodup, group_nodup. }
    simpl; by case_decide.
  Qed.
  Lemma group_insert_snoc ixss i x j ys :
    i  j 
    group_insert i x (ixss ++ [(j, ys)]) ₚₚ group_insert i x ixss ++ [(j,ys)].
  Proof.
    intros. induction ixss as [|[i' xs'] ixss IH];
      repeat (simplify_eq/= || case_decide); repeat constructor; by auto.
  Qed.
  Lemma group_snoc ixs j ys :
    j  ixs.*1  ys  []  group (ixs ++ ((j,) <$> ys)) ₚₚ group ixs ++ [(j,ys)].
  Proof.
    induction ixs as [|[i x] ixs IH]; csimpl; first by auto using group_fmap.
    rewrite ?not_elem_of_cons=> -[??]. etrans; last by apply group_insert_snoc.
    apply group_insert_perm, IH; auto using group_nodup.
  Qed.
End group.

Section map_reduce.
  Context {A B C} `{EqDecision K} (map : A  list (K * B)) (red : K  list B  list C).
  Context `{! j, Proper (() ==> ()) (red j)}.

  Global Instance bind_red_perm : Proper ((ₚₚ) ==> ()) (mbind (curry red)).
  Proof.
    induction 1 as [|[i1 xs1] [i2 xs2] ixss1 ixss2 [??]|[i1 xs1] [i2 xs2] ixss|];
      simplify_eq/=; try done.
    - repeat (done || f_equiv).
    - by rewrite !assoc_L (comm _ (red i2 xs2)).
    - by etrans.
  Qed.
  Global Instance map_reduce_perm : Proper (() ==> ()) (map_reduce map red).
  Proof. intros xs1 xs2 Hxs. by rewrite /map_reduce /= Hxs. Qed.
End map_reduce.


(** Correctness proofs of the distributed version *)
Class map_reduceG Σ A B `{Countable A, Countable B} := {
Robbert Krebbers's avatar
Robbert Krebbers committed
209 210
  map_reduce_mapG :> mapG Σ A;
  map_reduce_reduceG :> mapG Σ (Z * list B);
Robbert Krebbers's avatar
Robbert Krebbers committed
211 212 213 214
}.

Section mapper.
  Context `{Countable A, Countable B} {C : Type}.
Robbert Krebbers's avatar
Robbert Krebbers committed
215
  Context `{!heapG Σ, !proto_chanG Σ, !map_reduceG Σ A B} (N : namespace).
Robbert Krebbers's avatar
Robbert Krebbers committed
216 217 218 219
  Context (IA : A  val  iProp Σ) (IB : Z  B  val  iProp Σ) (IC : C  val  iProp Σ).
  Context (map : A  list (Z * B)) (red : Z  list B  list C).
  Context `{! j, Proper (() ==> ()) (red j)}.
  Local Open Scope nat_scope.
Robbert Krebbers's avatar
Robbert Krebbers committed
220
  Implicit Types n : nat.
Robbert Krebbers's avatar
Robbert Krebbers committed
221 222 223 224

  Definition IZB (iy : Z * B) (w : val) : iProp Σ :=
    ( w',  w = (#iy.1, w')%V   IB iy.1 iy.2 w')%I.
  Definition IZBs (iys : Z * list B) (w : val) : iProp Σ :=
Robbert Krebbers's avatar
Robbert Krebbers committed
225
    ( (l : loc),  w = (#iys.1, #l)%V   llist (IB iys.1) l (iys.2))%I.
Robbert Krebbers's avatar
Robbert Krebbers committed
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
  Definition RZB : relation (Z * B) := prod_relation ()%Z (λ _ _ : B, True).

  Instance RZB_dec : RelDecision RZB.
  Proof. solve_decision. Qed.
  Instance RZB_total : Total RZB.
  Proof. intros [i1 y1] [i2 y2]. destruct (total ()%Z i1 i2); [left|right]; done. Qed.
  Instance RZB_trans : Transitive RZB.
  Proof. by apply (prod_relation_trans _). Qed.
  Lemma RZB_cmp_spec : cmp_spec IZB RZB cmpZfst.
  Proof.
    iIntros ([i1 y1] [i2 y2] v1 v2) "!>"; iIntros (Φ) "[HI1 HI2] HΦ".
    iDestruct "HI1" as (w1 ->) "HI1". iDestruct "HI2" as (w2 ->) "HI2 /=".
    wp_lam; wp_pures. iSpecialize ("HΦ" with "[HI1 HI2]").
    { iSplitL "HI1"; rewrite /IZB /=; eauto with iFrame. }
    repeat case_bool_decide=> //; unfold RZB, prod_relation in *; naive_solver.
  Qed.

Robbert Krebbers's avatar
Robbert Krebbers committed
243
  Lemma par_map_reduce_map_server_spec n cmap csort l xs X ys :
Robbert Krebbers's avatar
Robbert Krebbers committed
244 245
    (n = 0  X =   xs = []) 
    {{{
Robbert Krebbers's avatar
Robbert Krebbers committed
246
      llist IA l xs 
Robbert Krebbers's avatar
Robbert Krebbers committed
247
      cmap  map_worker_protocol IA IZB map n (X : gmultiset A) @ N 
248
      csort  sort_fg_head_protocol IZB RZB ys @ N
Robbert Krebbers's avatar
Robbert Krebbers committed
249
    }}}
250
      par_map_reduce_map_server #n cmap csort #l
Robbert Krebbers's avatar
Robbert Krebbers committed
251 252
    {{{ ys', RET #();
      ys'  (xs ++ elements X) = map 
253
      csort  sort_fg_head_protocol IZB RZB (ys ++ ys') @ N
Robbert Krebbers's avatar
Robbert Krebbers committed
254 255
    }}}.
  Proof.
Robbert Krebbers's avatar
Robbert Krebbers committed
256 257
    iIntros (Hn Φ) "(Hl & Hcmap & Hcsort) HΦ".
    iLöb as "IH" forall (n xs X ys Hn Φ); wp_rec; wp_pures; simpl.
Robbert Krebbers's avatar
Robbert Krebbers committed
258 259 260 261
    case_bool_decide; wp_pures; simplify_eq/=.
    { destruct Hn as [-> ->]; first lia.
      iApply ("HΦ" $! []). rewrite right_id_L. auto. }
    destruct n as [|n]=> //=. wp_branch as %?|%_; wp_pures.
262
    - wp_apply (lisnil_spec with "Hl"); iIntros "Hl".
Robbert Krebbers's avatar
Robbert Krebbers committed
263
      destruct xs as [|x xs]; csimpl; wp_pures.
Robbert Krebbers's avatar
Robbert Krebbers committed
264
      + wp_select. wp_pures. rewrite Nat2Z.inj_succ Z.sub_1_r Z.pred_succ.
Robbert Krebbers's avatar
Robbert Krebbers committed
265 266 267 268
        iApply ("IH" $! _ [] with "[%] Hl Hcmap Hcsort HΦ"); naive_solver.
      + wp_select. wp_apply (lpop_spec with "Hl"); iIntros (v) "[HIx Hl]".
        wp_send with "[$HIx]".
        wp_apply ("IH" with "[%] Hl Hcmap Hcsort"); first done. iIntros (ys').
Robbert Krebbers's avatar
Robbert Krebbers committed
269 270
        rewrite gmultiset_elements_disj_union gmultiset_elements_singleton.
        rewrite assoc_L -(comm _ [x]). iApply "HΦ".
Robbert Krebbers's avatar
Robbert Krebbers committed
271
    - wp_recv (x k) as (Hx) "Hk".
272
      rewrite -(right_id END%proto _ (sort_fg_head_protocol _ _ _)).
Robbert Krebbers's avatar
Robbert Krebbers committed
273
      wp_apply (send_all_spec with "[$Hk $Hcsort]"); iIntros "Hcsort".
Robbert Krebbers's avatar
Robbert Krebbers committed
274
      rewrite right_id.
Robbert Krebbers's avatar
Robbert Krebbers committed
275
      wp_apply ("IH" with "[] Hl Hcmap Hcsort"); first done.
Robbert Krebbers's avatar
Robbert Krebbers committed
276 277 278 279 280 281 282 283
      iIntros (ys'). iDestruct 1 as (Hys) "Hcsort"; simplify_eq/=.
      rewrite -assoc_L. iApply ("HΦ" $! (map x ++ ys') with "[$Hcsort]").
      iPureIntro. rewrite (gmultiset_disj_union_difference {[ x ]} X)
        -?gmultiset_elem_of_singleton_subseteq //.
      rewrite (comm disj_union) gmultiset_elements_disj_union.
      by rewrite gmultiset_elements_singleton assoc_L bind_app -Hys /= right_id_L comm.
  Qed.

Robbert Krebbers's avatar
Robbert Krebbers committed
284
  Lemma par_map_reduce_collect_spec csort iys iys_sorted i l ys :
Robbert Krebbers's avatar
Robbert Krebbers committed
285 286 287 288 289 290
    let acc := from_option (λ '(i,y,w), [(i,y)]) [] in
    let accv := from_option (λ '(i,y,w), SOMEV (#(i:Z),w)) NONEV in
    ys  [] 
    Sorted RZB (iys_sorted ++ ((i,) <$> ys)) 
    i  iys_sorted.*1 
    {{{
Robbert Krebbers's avatar
Robbert Krebbers committed
291
      llist (IB i) l (reverse ys) 
292
      csort  sort_fg_tail_protocol IZB RZB iys (iys_sorted ++ ((i,) <$> ys)) @ N
Robbert Krebbers's avatar
Robbert Krebbers committed
293
    }}}
294
      par_map_reduce_collect csort #i #l
Robbert Krebbers's avatar
Robbert Krebbers committed
295
    {{{ ys' miy, RET accv miy;
Robbert Krebbers's avatar
Robbert Krebbers committed
296 297 298
       Sorted RZB ((iys_sorted ++ ((i,) <$> ys ++ ys')) ++ acc miy)  
       from_option (λ '(j,_,_), i  j  j  iys_sorted.*1)
                    (iys_sorted ++ ((i,) <$> ys ++ ys')  iys) miy  
Robbert Krebbers's avatar
Robbert Krebbers committed
299
      llist (IB i) l (reverse (ys ++ ys')) 
300
      csort  from_option (λ _, sort_fg_tail_protocol IZB RZB iys
Robbert Krebbers's avatar
Robbert Krebbers committed
301 302 303 304
        ((iys_sorted ++ ((i,) <$> ys ++ ys')) ++ acc miy)) END%proto miy @ N 
      from_option (λ '(i,y,w), IB i y w) True miy
    }}}.
  Proof.
Robbert Krebbers's avatar
Robbert Krebbers committed
305 306
    iIntros (acc accv Hys Hsort Hi Φ) "[Hl Hcsort] HΦ".
    iLöb as "IH" forall (ys Hys Hsort Hi Φ); wp_rec; wp_pures; simpl.
Robbert Krebbers's avatar
Robbert Krebbers committed
307
    wp_branch as %_|%Hperm; last first; wp_pures.
Robbert Krebbers's avatar
Robbert Krebbers committed
308
    { iApply ("HΦ" $! [] None with "[Hl $Hcsort]"); simpl.
309
     iEval (rewrite !right_id_L); auto with iFrame. }
Robbert Krebbers's avatar
Robbert Krebbers committed
310
    wp_recv ([j y] ?) as (Htl w ->) "HIy /=". wp_pures. rewrite -assoc_L.
Robbert Krebbers's avatar
Robbert Krebbers committed
311
    case_bool_decide as Hij; simplify_eq/=; wp_pures.
Robbert Krebbers's avatar
Robbert Krebbers committed
312
    - wp_apply (lcons_spec with "[$Hl $HIy]"); iIntros "Hl".
Robbert Krebbers's avatar
Robbert Krebbers committed
313
      rewrite -reverse_snoc. wp_apply ("IH" $! (ys ++ [y])
Robbert Krebbers's avatar
Robbert Krebbers committed
314
        with "[%] [%] [//] Hl [Hcsort] [HΦ]"); try iClear "IH".
Robbert Krebbers's avatar
Robbert Krebbers committed
315 316 317
      + intros ?; discriminate_list.
      + rewrite fmap_app /= assoc_L. by apply Sorted_snoc.
      + by rewrite fmap_app /= assoc_L.
Robbert Krebbers's avatar
Robbert Krebbers committed
318 319
      + iIntros "!>" (ys' miy). rewrite -!(assoc_L _ ys) /=. iApply "HΦ".
    - iApply ("HΦ" $! [] (Some (j,y,w))).
Robbert Krebbers's avatar
Robbert Krebbers committed
320 321 322 323 324 325 326 327 328 329 330 331 332
      rewrite /= !right_id_L assoc_L. iFrame. iPureIntro; split.
      { by apply Sorted_snoc. }
      split; first congruence.
      intros [[j' y'] [-> Hj]]%elem_of_list_fmap.
      destruct Hij; do 2 f_equal.
      destruct ys as [|y'' ys _] using rev_ind; first done.
      move: Htl. rewrite fmap_app assoc_L /=.
      inversion 1 as [|?? [? _]]; discriminate_list || simplify_list_eq.
      assert (RZB (j',y') (i,y'')) as [??]; last (simpl in *; lia).
      apply (Sorted_StronglySorted _) in Hsort.
      eapply elem_of_StronglySorted_app; set_solver.
  Qed.

Robbert Krebbers's avatar
Robbert Krebbers committed
333
  Lemma par_map_reduce_reduce_server_spec n iys iys_sorted miy zs l Y csort cred :
Robbert Krebbers's avatar
Robbert Krebbers committed
334 335 336 337 338 339
    let acc := from_option (λ '(i,y,w), [(i,y)]) [] in
    let accv := from_option (λ '(i,y,w), SOMEV (#(i:Z),w)) NONEV in
    (n = 0  miy = None  Y = ) 
    from_option (λ '(i,_,_), i  iys_sorted.*1) (iys_sorted  iys) miy 
    Sorted RZB (iys_sorted ++ acc miy) 
    {{{
Robbert Krebbers's avatar
Robbert Krebbers committed
340
      llist IC l zs 
341
      csort  from_option (λ _, sort_fg_tail_protocol IZB RZB iys
Robbert Krebbers's avatar
Robbert Krebbers committed
342
        (iys_sorted ++ acc miy)) END%proto miy @ N 
Robbert Krebbers's avatar
Robbert Krebbers committed
343
      cred  map_worker_protocol IZBs IC (curry red) n (Y : gmultiset (Z * list B)) @ N 
Robbert Krebbers's avatar
Robbert Krebbers committed
344
      from_option (λ '(i,y,w), IB i y w) True miy
Robbert Krebbers's avatar
Robbert Krebbers committed
345
    }}}
346
      par_map_reduce_reduce_server #n csort cred (accv miy) #l
Robbert Krebbers's avatar
Robbert Krebbers committed
347
    {{{ zs', RET #();
Robbert Krebbers's avatar
Robbert Krebbers committed
348
        (group iys_sorted = curry red) ++ zs'  (group iys ++ elements Y) = curry red  
Robbert Krebbers's avatar
Robbert Krebbers committed
349
       llist IC l (zs' ++ zs)
Robbert Krebbers's avatar
Robbert Krebbers committed
350 351
    }}}.
  Proof.
Robbert Krebbers's avatar
Robbert Krebbers committed
352 353
    iIntros (acc accv Hn Hmiy Hsort Φ) "(Hl & Hcsort & Hcred & HImiy) HΦ".
    iLöb as "IH" forall (n iys_sorted miy zs Y Hn Hmiy Hsort Φ); wp_rec; wp_pures; simpl.
Robbert Krebbers's avatar
Robbert Krebbers committed
354 355
    case_bool_decide; wp_pures; simplify_eq/=.
    { destruct Hn as [-> ->]; first lia.
Robbert Krebbers's avatar
Robbert Krebbers committed
356
      iApply ("HΦ" $! [] with "[$Hl]"); iPureIntro; simpl.
Robbert Krebbers's avatar
Robbert Krebbers committed
357 358 359 360 361
      by rewrite gmultiset_elements_empty !right_id_L Hmiy. }
    destruct n as [|n]=> //=. wp_branch as %?|%_; wp_pures.
    - destruct miy as [[[i y] w]|]; simplify_eq/=; wp_pures; last first.
      + wp_select. wp_pures. rewrite Nat2Z.inj_succ Z.sub_1_r Z.pred_succ.
        iApply ("IH" $! _ _ None
Robbert Krebbers's avatar
Robbert Krebbers committed
362 363 364 365 366 367 368 369
          with "[%] [%] [%] Hl Hcsort Hcred [] HΦ"); naive_solver.
      + wp_apply (lnil_spec (IB i) with "[//]"); iIntros (k) "Hk".
        wp_apply (lcons_spec with "[$Hk $HImiy]"); iIntros "Hk".
        wp_apply (par_map_reduce_collect_spec _ _ _ _ _ [_] 
          with "[$Hk $Hcsort]"); try done.
        iIntros (ys' miy). iDestruct 1 as (? Hmiy') "(Hk & Hcsort & HImiy)"; csimpl.
        wp_select; wp_pures. wp_send ((i, reverse (y :: ys'))) with "[Hk]".
        { iExists k; simpl; auto. }
Robbert Krebbers's avatar
Robbert Krebbers committed
370
        wp_pures. iApply ("IH" $! _ (_ ++ _) miy
Robbert Krebbers's avatar
Robbert Krebbers committed
371
          with "[%] [%] [//] Hl Hcsort Hcred HImiy"); first done.
Robbert Krebbers's avatar
Robbert Krebbers committed
372
        { destruct miy as [[[i' y'] w']|]; set_solver +Hmiy'. }
Robbert Krebbers's avatar
Robbert Krebbers committed
373
        iIntros "!>" (zs'). iDestruct 1 as (Hperm) "HIC".
Robbert Krebbers's avatar
Robbert Krebbers committed
374 375 376 377 378
        iApply ("HΦ" with "[$HIC]"); iPureIntro; move: Hperm.
        rewrite gmultiset_elements_disj_union gmultiset_elements_singleton.
        rewrite group_snoc // reverse_Permutation.
        rewrite !bind_app /= right_id_L -!assoc_L -(comm _ zs') !assoc_L.
        apply (inj (++ _)).
Robbert Krebbers's avatar
Robbert Krebbers committed
379
    - wp_recv ([i ys] k) as (Hy) "Hk".
380
      wp_apply (lprep_spec with "[$Hl $Hk]"); iIntros "[Hl _]".
Robbert Krebbers's avatar
Robbert Krebbers committed
381 382
      wp_apply ("IH" with "[ ] [//] [//] Hl Hcsort Hcred HImiy"); first done.
      iIntros (zs'); iDestruct 1 as (Hzs) "HIC"; simplify_eq/=.
Robbert Krebbers's avatar
Robbert Krebbers committed
383 384 385 386 387 388 389 390
      iApply ("HΦ" $! (zs' ++ red i ys)). iSplit; last by rewrite -assoc_L.
      iPureIntro. rewrite (gmultiset_disj_union_difference {[ i,ys ]} Y)
        -?gmultiset_elem_of_singleton_subseteq //.
      rewrite (comm disj_union) gmultiset_elements_disj_union.
      rewrite gmultiset_elements_singleton assoc_L Hzs !bind_app /=.
      by rewrite right_id_L !assoc_L.
  Qed.

Robbert Krebbers's avatar
Robbert Krebbers committed
391
  Lemma par_map_reduce_spec n vmap vred l xs :
Robbert Krebbers's avatar
Robbert Krebbers committed
392
    0 < n 
Robbert Krebbers's avatar
Robbert Krebbers committed
393 394
    map_spec IA IZB map vmap -
    map_spec IZBs IC (curry red) vred -
Robbert Krebbers's avatar
Robbert Krebbers committed
395
    {{{ llist IA l xs }}}
396
      par_map_reduce #n vmap vred #l
Robbert Krebbers's avatar
Robbert Krebbers committed
397
    {{{ k zs, RET #k; zs  map_reduce map red xs  llist IC k zs }}}.
Robbert Krebbers's avatar
Robbert Krebbers committed
398
  Proof.
Robbert Krebbers's avatar
Robbert Krebbers committed
399
    iIntros (?) "#Hmap #Hred !>"; iIntros (Φ) "Hl HΦ". wp_lam; wp_pures.
Robbert Krebbers's avatar
Robbert Krebbers committed
400
    wp_apply (start_map_service_spec with "Hmap [//]"); iIntros (cmap) "Hcmap".
401
    wp_apply (start_chan_proto_spec N (sort_fg_protocol IZB RZB <++> END)%proto);
Robbert Krebbers's avatar
Robbert Krebbers committed
402
      iIntros (csort) "Hcsort".
403
    { wp_apply (sort_service_fg_spec N with "[] Hcsort"); last by auto.
Robbert Krebbers's avatar
Robbert Krebbers committed
404 405
      iApply RZB_cmp_spec. }
    rewrite right_id.
Robbert Krebbers's avatar
Robbert Krebbers committed
406
    wp_apply (par_map_reduce_map_server_spec with "[$Hl $Hcmap $Hcsort]"); first lia.
Robbert Krebbers's avatar
Robbert Krebbers committed
407
    iIntros (iys). rewrite gmultiset_elements_empty right_id_L.
Robbert Krebbers's avatar
Robbert Krebbers committed
408 409
    iDestruct 1 as (Hiys) "Hcsort /=". wp_select; wp_pures; simpl.
    wp_apply (start_map_service_spec with "Hred [//]"); iIntros (cred) "Hcred".
Robbert Krebbers's avatar
Robbert Krebbers committed
410
    wp_branch as %_|%Hnil; last first.
411
    { wp_pures. wp_apply (lnil_spec with "[//]"); iIntros (k) "Hk".
Robbert Krebbers's avatar
Robbert Krebbers committed
412
      iApply ("HΦ" $! k [] with "[$Hk]"); simpl.
Robbert Krebbers's avatar
Robbert Krebbers committed
413 414
      by rewrite /map_reduce /= -Hiys -Hnil. }
    wp_recv ([i y] ?) as (_ w ->) "HIB /="; wp_pures.
415
    wp_apply (lnil_spec with "[//]"); iIntros (k) "Hk".
Robbert Krebbers's avatar
Robbert Krebbers committed
416
    wp_apply (par_map_reduce_reduce_server_spec _ _ [] (Some (i, y, w))
417
      with "[$Hk $Hcsort $Hcred $HIB]"); simpl; auto; [lia|set_solver|].
Robbert Krebbers's avatar
Robbert Krebbers committed
418 419 420
    iIntros (zs). rewrite /= gmultiset_elements_empty !right_id.
    iDestruct 1 as (Hzs) "Hk". wp_pures.
    iApply ("HΦ" with "[$Hk]"). by rewrite Hzs Hiys.
Robbert Krebbers's avatar
Robbert Krebbers committed
421 422
  Qed.
End mapper.