1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
use crate::op_tree::{OpSetMetadata, OpTreeNode};
use crate::query::{binary_search_by, QueryResult, TreeQuery};
use crate::types::{Key, ListEncoding, Op, HEAD};
use std::cmp::Ordering;
use std::fmt::Debug;

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct SeekOpWithPatch<'a> {
    op: Op,
    pub(crate) pos: usize,
    pub(crate) succ: Vec<usize>,
    found: bool,
    encoding: ListEncoding,
    pub(crate) seen: usize,
    pub(crate) last_width: usize,
    last_seen: Option<Key>,
    pub(crate) values: Vec<&'a Op>,
    pub(crate) had_value_before: bool,
}

impl<'a> SeekOpWithPatch<'a> {
    pub(crate) fn new(op: &Op, encoding: ListEncoding) -> Self {
        SeekOpWithPatch {
            op: op.clone(),
            succ: vec![],
            pos: 0,
            found: false,
            encoding,
            seen: 0,
            last_width: 0,
            last_seen: None,
            values: vec![],
            had_value_before: false,
        }
    }

    fn lesser_insert(&self, op: &Op, m: &OpSetMetadata) -> bool {
        op.insert && m.lamport_cmp(op.id, self.op.id) == Ordering::Less
    }

    fn greater_opid(&self, op: &Op, m: &OpSetMetadata) -> bool {
        m.lamport_cmp(op.id, self.op.id) == Ordering::Greater
    }

    fn is_target_insert(&self, op: &Op) -> bool {
        op.insert && op.elemid() == self.op.key.elemid()
    }

    /// Keeps track of the number of visible list elements we have seen. Increments `self.seen` if
    /// operation `e` associates a visible value with a list element, and if we have not already
    /// counted that list element (this ensures that if a list element has several values, i.e.
    /// a conflict, then it is still only counted once).
    fn count_visible(&mut self, e: &Op) {
        if e.elemid() == self.op.elemid() {
            return;
        }
        if e.insert {
            self.last_seen = None
        }
        if e.visible() && self.last_seen.is_none() {
            self.seen += e.width(self.encoding);
            self.last_seen = Some(e.elemid_or_key())
        }
    }
}

impl<'a> TreeQuery<'a> for SeekOpWithPatch<'a> {
    fn query_node_with_metadata(
        &mut self,
        child: &'a OpTreeNode,
        m: &OpSetMetadata,
        ops: &[Op],
    ) -> QueryResult {
        if self.found {
            return QueryResult::Descend;
        }
        match self.op.key {
            // Special case for insertion at the head of the list (`e == HEAD` is only possible for
            // an insertion operation). Skip over any list elements whose elemId is greater than
            // the opId of the operation being inserted.
            Key::Seq(e) if e == HEAD => {
                while self.pos < child.len() {
                    let op = &ops[child.get(self.pos).unwrap()];
                    if op.insert && m.lamport_cmp(op.id, self.op.id) == Ordering::Less {
                        break;
                    }
                    self.count_visible(op);
                    self.pos += 1;
                }
                QueryResult::Finish
            }

            // Updating a list: search for the tree node that contains the new operation's
            // reference element (i.e. the element we're updating or inserting after)
            Key::Seq(e) => {
                if self.found || child.index.ops.contains(&e.0) {
                    QueryResult::Descend
                } else {
                    self.pos += child.len();

                    // When we skip over a subtree, we need to count the number of visible list
                    // elements we're skipping over. Each node stores the number of visible
                    // elements it contains. However, it could happen that a visible element is
                    // split across two tree nodes. To avoid double-counting in this situation, we
                    // subtract one if the last visible element also appears in this tree node.
                    let mut num_vis = child.index.visible_len(self.encoding);
                    if num_vis > 0 {
                        // FIXME: I think this is wrong: we should subtract one only if this
                        // subtree contains a *visible* (i.e. empty succs) operation for the list
                        // element with elemId `last_seen`; this will subtract one even if all
                        // values for this list element have been deleted in this subtree.
                        if let Some(last_seen) = self.last_seen {
                            if child.index.has_visible(&last_seen) {
                                num_vis -= 1;
                            }
                        }
                        self.seen += num_vis;

                        // FIXME: this is also wrong: `last_seen` needs to be the elemId of the
                        // last *visible* list element in this subtree, but I think this returns
                        // the last operation's elemId regardless of whether it's visible or not.
                        // This will lead to incorrect counting if `last_seen` is not visible: it's
                        // not counted towards `num_vis`, so we shouldn't be subtracting 1.
                        self.last_seen = Some(ops[child.last()].elemid_or_key());
                    }
                    QueryResult::Next
                }
            }

            // Updating a map: operations appear in sorted order by key
            Key::Map(_) => {
                let start = binary_search_by(child, ops, |op| m.key_cmp(&op.key, &self.op.key));
                self.pos = start;
                QueryResult::Skip(start)
            }
        }
    }

    // Only called when operating on a sequence (list/text) object, since updates of a map are
    // handled in `query_node_with_metadata`.
    fn query_element_with_metadata(&mut self, e: &'a Op, m: &OpSetMetadata) -> QueryResult {
        match self.op.key {
            Key::Map(_) => {
                if !self.found {
                    // Iterate over any existing operations for the same key; stop when we reach an
                    // operation with a different key
                    if e.key != self.op.key {
                        return QueryResult::Finish;
                    }

                    // Keep track of any ops we're overwriting and any conflicts on this key
                    if self.op.overwrites(e) {
                        // when we encounter an increment op we also want to find the counter for
                        // it.
                        if self.op.is_inc() && e.is_counter() && e.visible() {
                            self.values.push(e);
                        }
                        self.succ.push(self.pos);
                        self.last_width = e.width(self.encoding);

                        if e.visible() {
                            self.had_value_before = true;
                        }
                    } else if e.visible() {
                        self.values.push(e);
                    }

                    // Ops for the same key should be in ascending order of opId, so we break when
                    // we reach an op with an opId greater than that of the new operation
                    if m.lamport_cmp(e.id, self.op.id) == Ordering::Greater {
                        self.found = true;
                        return QueryResult::Next;
                    }

                    self.pos += 1;
                } else {
                    // For the purpose of reporting conflicts, we also need to take into account any
                    // ops for the same key that appear after the new operation

                    if e.key != self.op.key {
                        return QueryResult::Finish;
                    }
                    // No need to check if `self.op.overwrites(op)` because an operation's `preds`
                    // must always have lower Lamport timestamps than that op itself, and the ops
                    // here all have greater opIds than the new op
                    if e.visible() {
                        self.values.push(e);
                    }
                }
                QueryResult::Next
            }
            Key::Seq(_) => {
                let result = if !self.found {
                    // First search for the referenced list element (i.e. the element we're updating, or
                    // after which we're inserting)
                    if self.is_target_insert(e) {
                        self.found = true;
                        if self.op.overwrites(e) {
                            // when we encounter an increment op we also want to find the counter for
                            // it.
                            if self.op.is_inc() && e.is_counter() && e.visible() {
                                self.values.push(e);
                            }
                            self.succ.push(self.pos);
                            self.last_width = e.width(self.encoding);
                        }
                        if e.visible() {
                            self.had_value_before = true;
                        }
                    }
                    self.pos += 1;
                    QueryResult::Next
                } else {
                    // Once we've found the reference element, keep track of any ops that we're overwriting
                    let overwritten = self.op.overwrites(e);
                    if overwritten {
                        // when we encounter an increment op we also want to find the counter for
                        // it.
                        if self.op.is_inc() && e.is_counter() && e.visible() {
                            self.values.push(e);
                        }
                        self.succ.push(self.pos);
                        self.last_width = e.width(self.encoding);
                    }

                    // If the new op is an insertion, skip over any existing list elements whose elemId is
                    // greater than the ID of the new insertion
                    if self.op.insert {
                        if self.lesser_insert(e, m) {
                            // Insert before the first existing list element whose elemId is less than that
                            // of the new insertion
                            QueryResult::Finish
                        } else {
                            self.pos += 1;
                            QueryResult::Next
                        }
                    } else if e.insert {
                        // If the new op is an update of an existing list element, the first insertion op
                        // we encounter after the reference element indicates the end of the reference elem
                        QueryResult::Finish
                    } else {
                        // When updating an existing list element, keep track of any conflicts on this list
                        // element. We also need to remember if the list element had any visible elements
                        // prior to applying the new operation: if not, the new operation is resurrecting
                        // a deleted list element, so it looks like an insertion in the patch.
                        if e.visible() {
                            self.had_value_before = true;
                            if !overwritten {
                                self.values.push(e);
                            }
                        }

                        // We now need to put the ops for the same list element into ascending order, so we
                        // skip over any ops whose ID is less than that of the new operation.
                        if !self.greater_opid(e, m) {
                            self.pos += 1;
                        }
                        QueryResult::Next
                    }
                };

                // The patch needs to know the list index of each operation, so we count the number of
                // visible list elements up to the insertion position of the new operation
                if result == QueryResult::Next {
                    self.count_visible(e);
                }
                result
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::{super::seek_op::tests::optree_with_only_internally_visible_ops, SeekOpWithPatch};
    use crate::{
        op_tree::B,
        types::{ListEncoding, ObjId},
    };

    #[test]
    fn test_insert_on_internal_only_nodes() {
        let (set, new_op) = optree_with_only_internally_visible_ops();

        let q = SeekOpWithPatch::new(&new_op, ListEncoding::List);
        let q = set.search(&ObjId::root(), q);

        // we've inserted `B - 1` elements for "a", so the index should be `B`
        assert_eq!(q.pos, B);
    }
}