001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.Order.ASCENDING;
021import static org.apache.hadoop.hbase.util.Order.DESCENDING;
022
023import java.math.BigDecimal;
024import java.math.MathContext;
025import java.math.RoundingMode;
026import java.nio.charset.Charset;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * Utility class that handles ordered byte arrays. That is, unlike {@link Bytes}, these methods
031 * produce byte arrays which maintain the sort order of the original values.
032 * <h3>Encoding Format summary</h3>
033 * <p>
034 * Each value is encoded as one or more bytes. The first byte of the encoding, its meaning, and a
035 * terse description of the bytes that follow is given by the following table:
036 * </p>
037 * <table summary="Encodings">
038 * <tr>
039 * <th>Content Type</th>
040 * <th>Encoding</th>
041 * </tr>
042 * <tr>
043 * <td>NULL</td>
044 * <td>0x05</td>
045 * </tr>
046 * <tr>
047 * <td>negative infinity</td>
048 * <td>0x07</td>
049 * </tr>
050 * <tr>
051 * <td>negative large</td>
052 * <td>0x08, ~E, ~M</td>
053 * </tr>
054 * <tr>
055 * <td>negative medium</td>
056 * <td>0x13-E, ~M</td>
057 * </tr>
058 * <tr>
059 * <td>negative small</td>
060 * <td>0x14, -E, ~M</td>
061 * </tr>
062 * <tr>
063 * <td>zero</td>
064 * <td>0x15</td>
065 * </tr>
066 * <tr>
067 * <td>positive small</td>
068 * <td>0x16, ~-E, M</td>
069 * </tr>
070 * <tr>
071 * <td>positive medium</td>
072 * <td>0x17+E, M</td>
073 * </tr>
074 * <tr>
075 * <td>positive large</td>
076 * <td>0x22, E, M</td>
077 * </tr>
078 * <tr>
079 * <td>positive infinity</td>
080 * <td>0x23</td>
081 * </tr>
082 * <tr>
083 * <td>NaN</td>
084 * <td>0x25</td>
085 * </tr>
086 * <tr>
087 * <td>fixed-length 32-bit integer</td>
088 * <td>0x27, I</td>
089 * </tr>
090 * <tr>
091 * <td>fixed-length 64-bit integer</td>
092 * <td>0x28, I</td>
093 * </tr>
094 * <tr>
095 * <td>fixed-length 8-bit integer</td>
096 * <td>0x29</td>
097 * </tr>
098 * <tr>
099 * <td>fixed-length 16-bit integer</td>
100 * <td>0x2a</td>
101 * </tr>
102 * <tr>
103 * <td>fixed-length 32-bit float</td>
104 * <td>0x30, F</td>
105 * </tr>
106 * <tr>
107 * <td>fixed-length 64-bit float</td>
108 * <td>0x31, F</td>
109 * </tr>
110 * <tr>
111 * <td>TEXT</td>
112 * <td>0x33, T</td>
113 * </tr>
114 * <tr>
115 * <td>variable length BLOB</td>
116 * <td>0x35, B</td>
117 * </tr>
118 * <tr>
119 * <td>byte-for-byte BLOB</td>
120 * <td>0x36, X</td>
121 * </tr>
122 * </table>
123 * <h3>Null Encoding</h3>
124 * <p>
125 * Each value that is a NULL encodes as a single byte of 0x05. Since every other value encoding
126 * begins with a byte greater than 0x05, this forces NULL values to sort first.
127 * </p>
128 * <h3>Text Encoding</h3>
129 * <p>
130 * Each text value begins with a single byte of 0x33 and ends with a single byte of 0x00. There are
131 * zero or more intervening bytes that encode the text value. The intervening bytes are chosen so
132 * that the encoding will sort in the desired collating order. The intervening bytes may not contain
133 * a 0x00 character; the only 0x00 byte allowed in a text encoding is the final byte.
134 * </p>
135 * <p>
136 * The text encoding ends in 0x00 in order to ensure that when there are two strings where one is a
137 * prefix of the other that the shorter string will sort first.
138 * </p>
139 * <h3>Binary Encoding</h3>
140 * <p>
141 * There are two encoding strategies for binary fields, referred to as "BlobVar" and "BlobCopy".
142 * BlobVar is less efficient in both space and encoding time. It has no limitations on the range of
143 * encoded values. BlobCopy is a byte-for-byte copy of the input data followed by a termination
144 * byte. It is extremely fast to encode and decode. It carries the restriction of not allowing a
145 * 0x00 value in the input byte[] as this value is used as the termination byte.
146 * </p>
147 * <h4>BlobVar</h4>
148 * <p>
149 * "BlobVar" encodes the input byte[] in a manner similar to a variable length integer encoding. As
150 * with the other {@code OrderedBytes} encodings, the first encoded byte is used to indicate what
151 * kind of value follows. This header byte is 0x37 for BlobVar encoded values. As with the
152 * traditional varint encoding, the most significant bit of each subsequent encoded {@code byte} is
153 * used as a continuation marker. The 7 remaining bits contain the 7 most significant bits of the
154 * first unencoded byte. The next encoded byte starts with a continuation marker in the MSB. The
155 * least significant bit from the first unencoded byte follows, and the remaining 6 bits contain the
156 * 6 MSBs of the second unencoded byte. The encoding continues, encoding 7 bytes on to 8 encoded
157 * bytes. The MSB of the final encoded byte contains a termination marker rather than a continuation
158 * marker, and any remaining bits from the final input byte. Any trailing bits in the final encoded
159 * byte are zeros.
160 * </p>
161 * <h4>BlobCopy</h4>
162 * <p>
163 * "BlobCopy" is a simple byte-for-byte copy of the input data. It uses 0x38 as the header byte, and
164 * is terminated by 0x00 in the DESCENDING case. This alternative encoding is faster and more
165 * space-efficient, but it cannot accept values containing a 0x00 byte in DESCENDING order.
166 * </p>
167 * <h3>Variable-length Numeric Encoding</h3>
168 * <p>
169 * Numeric values must be coded so as to sort in numeric order. We assume that numeric values can be
170 * both integer and floating point values. Clients must be careful to use inspection methods for
171 * encoded values (such as {@link #isNumericInfinite(PositionedByteRange)} and
172 * {@link #isNumericNaN(PositionedByteRange)} to protect against decoding values into object which
173 * do not support these numeric concepts (such as {@link Long} and {@link BigDecimal}).
174 * </p>
175 * <p>
176 * Simplest cases first: If the numeric value is a NaN, then the encoding is a single byte of 0x25.
177 * This causes NaN values to sort after every other numeric value.
178 * </p>
179 * <p>
180 * If the numeric value is a negative infinity then the encoding is a single byte of 0x07. Since
181 * every other numeric value except NaN has a larger initial byte, this encoding ensures that
182 * negative infinity will sort prior to every other numeric value other than NaN.
183 * </p>
184 * <p>
185 * If the numeric value is a positive infinity then the encoding is a single byte of 0x23. Every
186 * other numeric value encoding begins with a smaller byte, ensuring that positive infinity always
187 * sorts last among numeric values. 0x23 is also smaller than 0x33, the initial byte of a text
188 * value, ensuring that every numeric value sorts before every text value.
189 * </p>
190 * <p>
191 * If the numeric value is exactly zero then it is encoded as a single byte of 0x15. Finite negative
192 * values will have initial bytes of 0x08 through 0x14 and finite positive values will have initial
193 * bytes of 0x16 through 0x22.
194 * </p>
195 * <p>
196 * For all numeric values, we compute a mantissa M and an exponent E. The mantissa is a base-100
197 * representation of the value. The exponent E determines where to put the decimal point.
198 * </p>
199 * <p>
200 * Each centimal digit of the mantissa is stored in a byte. If the value of the centimal digit is X
201 * (hence X&ge;0 and X&le;99) then the byte value will be 2*X+1 for every byte of the mantissa,
202 * except for the last byte which will be 2*X+0. The mantissa must be the minimum number of bytes
203 * necessary to represent the value; trailing X==0 digits are omitted. This means that the mantissa
204 * will never contain a byte with the value 0x00.
205 * </p>
206 * <p>
207 * If we assume all digits of the mantissa occur to the right of the decimal point, then the
208 * exponent E is the power of one hundred by which one must multiply the mantissa to recover the
209 * original value.
210 * </p>
211 * <p>
212 * Values are classified as large, medium, or small according to the value of E. If E is 11 or more,
213 * the value is large. For E between 0 and 10, the value is medium. For E less than zero, the value
214 * is small.
215 * </p>
216 * <p>
217 * Large positive values are encoded as a single byte 0x22 followed by E as a varint and then M.
218 * Medium positive values are a single byte of 0x17+E followed by M. Small positive values are
219 * encoded as a single byte 0x16 followed by the ones-complement of the varint for -E followed by M.
220 * </p>
221 * <p>
222 * Small negative values are encoded as a single byte 0x14 followed by -E as a varint and then the
223 * ones-complement of M. Medium negative values are encoded as a byte 0x13-E followed by the
224 * ones-complement of M. Large negative values consist of the single byte 0x08 followed by the
225 * ones-complement of the varint encoding of E followed by the ones-complement of M.
226 * </p>
227 * <h3>Fixed-length Integer Encoding</h3>
228 * <p>
229 * All 4-byte integers are serialized to a 5-byte, fixed-width, sortable byte format. All 8-byte
230 * integers are serialized to the equivelant 9-byte format. Serialization is performed by writing a
231 * header byte, inverting the integer sign bit and writing the resulting bytes to the byte array in
232 * big endian order.
233 * </p>
234 * <h3>Fixed-length Floating Point Encoding</h3>
235 * <p>
236 * 32-bit and 64-bit floating point numbers are encoded to a 5-byte and 9-byte encoding format,
237 * respectively. The format is identical, save for the precision respected in each step of the
238 * operation.
239 * <p>
240 * This format ensures the following total ordering of floating point values:
241 * Float.NEGATIVE_INFINITY &lt; -Float.MAX_VALUE &lt; ... &lt; -Float.MIN_VALUE &lt; -0.0 &lt; +0.0;
242 * &lt; Float.MIN_VALUE &lt; ... &lt; Float.MAX_VALUE &lt; Float.POSITIVE_INFINITY &lt; Float.NaN
243 * </p>
244 * <p>
245 * Floating point numbers are encoded as specified in IEEE 754. A 32-bit single precision float
246 * consists of a sign bit, 8-bit unsigned exponent encoded in offset-127 notation, and a 23-bit
247 * significand. The format is described further in the
248 * <a href="http://en.wikipedia.org/wiki/Single_precision"> Single Precision Floating Point
249 * Wikipedia page</a>
250 * </p>
251 * <p>
252 * The value of a normal float is -1 <sup>sign bit</sup> &times; 2<sup>exponent - 127</sup> &times;
253 * 1.significand
254 * </p>
255 * <p>
256 * The IEE754 floating point format already preserves sort ordering for positive floating point
257 * numbers when the raw bytes are compared in most significant byte order. This is discussed further
258 * at <a href= "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm">
259 * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm</a>
260 * </p>
261 * <p>
262 * Thus, we need only ensure that negative numbers sort in the the exact opposite order as positive
263 * numbers (so that say, negative infinity is less than negative 1), and that all negative numbers
264 * compare less than any positive number. To accomplish this, we invert the sign bit of all floating
265 * point numbers, and we also invert the exponent and significand bits if the floating point number
266 * was negative.
267 * </p>
268 * <p>
269 * More specifically, we first store the floating point bits into a 32-bit int {@code j} using
270 * {@link Float#floatToIntBits}. This method collapses all NaNs into a single, canonical NaN value
271 * but otherwise leaves the bits unchanged. We then compute
272 * </p>
273 *
274 * <pre>
275 * j &circ;= (j &gt;&gt; (Integer.SIZE - 1)) | Integer.MIN_SIZE
276 * </pre>
277 * <p>
278 * which inverts the sign bit and XOR's all other bits with the sign bit itself. Comparing the raw
279 * bytes of {@code j} in most significant byte order is equivalent to performing a single precision
280 * floating point comparison on the underlying bits (ignoring NaN comparisons, as NaNs don't compare
281 * equal to anything when performing floating point comparisons).
282 * </p>
283 * <p>
284 * The resulting integer is then converted into a byte array by serializing the integer one byte at
285 * a time in most significant byte order. The serialized integer is prefixed by a single header
286 * byte. All serialized values are 5 bytes in length.
287 * </p>
288 * <p>
289 * {@code OrderedBytes} encodings are heavily influenced by the
290 * <a href="http://sqlite.org/src4/doc/trunk/www/key_encoding.wiki">SQLite4 Key Encoding</a>. Slight
291 * deviations are make in the interest of order correctness and user extensibility. Fixed-width
292 * {@code Long} and {@link Double} encodings are based on implementations from the now defunct
293 * Orderly library.
294 * </p>
295 */
296@InterfaceAudience.Public
297public class OrderedBytes {
298
299  /*
300   * These constants define header bytes used to identify encoded values. Note that the values here
301   * are not exhaustive as the Numeric format encodes portions of its value within the header byte.
302   * The values listed here are directly applied to persisted data -- DO NOT modify the values
303   * specified here. Instead, gaps are placed intentionally between values so that new
304   * implementations can be inserted into the total ordering enforced here.
305   */
306  private static final byte NULL = 0x05;
307  // room for 1 expansion type
308  private static final byte NEG_INF = 0x07;
309  private static final byte NEG_LARGE = 0x08;
310  private static final byte NEG_MED_MIN = 0x09;
311  private static final byte NEG_MED_MAX = 0x13;
312  private static final byte NEG_SMALL = 0x14;
313  private static final byte ZERO = 0x15;
314  private static final byte POS_SMALL = 0x16;
315  private static final byte POS_MED_MIN = 0x17;
316  private static final byte POS_MED_MAX = 0x21;
317  private static final byte POS_LARGE = 0x22;
318  private static final byte POS_INF = 0x23;
319  // room for 2 expansion type
320  private static final byte NAN = 0x26;
321  // room for 2 expansion types
322  private static final byte FIXED_INT8 = 0x29;
323  private static final byte FIXED_INT16 = 0x2a;
324  private static final byte FIXED_INT32 = 0x2b;
325  private static final byte FIXED_INT64 = 0x2c;
326  // room for 3 expansion types
327  private static final byte FIXED_FLOAT32 = 0x30;
328  private static final byte FIXED_FLOAT64 = 0x31;
329  // room for 2 expansion type
330  private static final byte TEXT = 0x34;
331  // room for 2 expansion type
332  private static final byte BLOB_VAR = 0x37;
333  private static final byte BLOB_COPY = 0x38;
334
335  /*
336   * The following constant values are used by encoding implementations
337   */
338
339  public static final Charset UTF8 = Charset.forName("UTF-8");
340  private static final byte TERM = 0x00;
341
342  /**
343   * Max precision guaranteed to fit into a {@code long}.
344   */
345  public static final int MAX_PRECISION = 31;
346
347  /**
348   * The context used to normalize {@link BigDecimal} values.
349   */
350  public static final MathContext DEFAULT_MATH_CONTEXT =
351    new MathContext(MAX_PRECISION, RoundingMode.HALF_UP);
352
353  /**
354   * Creates the standard exception when the encoded header byte is unexpected for the decoding
355   * context.
356   * @param header value used in error message.
357   */
358  private static IllegalArgumentException unexpectedHeader(byte header) {
359    throw new IllegalArgumentException(
360      "unexpected value in first byte: 0x" + Long.toHexString(header));
361  }
362
363  /**
364   * Perform unsigned comparison between two long values. Conforms to the same interface as
365   * {@link org.apache.hadoop.hbase.CellComparator}.
366   */
367  private static int unsignedCmp(long x1, long x2) {
368    int cmp;
369    if ((cmp = (x1 < x2 ? -1 : (x1 == x2 ? 0 : 1))) == 0) return 0;
370    // invert the result when either value is negative
371    if ((x1 < 0) != (x2 < 0)) return -cmp;
372    return cmp;
373  }
374
375  /**
376   * Write a 32-bit unsigned integer to {@code dst} as 4 big-endian bytes.
377   * @return number of bytes written.
378   */
379  private static int putUint32(PositionedByteRange dst, int val) {
380    dst.put((byte) (val >>> 24)).put((byte) (val >>> 16)).put((byte) (val >>> 8)).put((byte) val);
381    return 4;
382  }
383
384  /**
385   * Encode an unsigned 64-bit unsigned integer {@code val} into {@code dst}.
386   * @param dst  The destination to which encoded bytes are written.
387   * @param val  The value to write.
388   * @param comp Compliment the encoded value when {@code comp} is true.
389   * @return number of bytes written.
390   */
391  @InterfaceAudience.Private
392  static int putVaruint64(PositionedByteRange dst, long val, boolean comp) {
393    int w, y, len = 0;
394    final int offset = dst.getOffset(), start = dst.getPosition();
395    byte[] a = dst.getBytes();
396    Order ord = comp ? DESCENDING : ASCENDING;
397    if (-1 == unsignedCmp(val, 241L)) {
398      dst.put((byte) val);
399      len = dst.getPosition() - start;
400      ord.apply(a, offset + start, len);
401      return len;
402    }
403    if (-1 == unsignedCmp(val, 2288L)) {
404      y = (int) (val - 240);
405      dst.put((byte) (y / 256 + 241)).put((byte) (y % 256));
406      len = dst.getPosition() - start;
407      ord.apply(a, offset + start, len);
408      return len;
409    }
410    if (-1 == unsignedCmp(val, 67824L)) {
411      y = (int) (val - 2288);
412      dst.put((byte) 249).put((byte) (y / 256)).put((byte) (y % 256));
413      len = dst.getPosition() - start;
414      ord.apply(a, offset + start, len);
415      return len;
416    }
417    y = (int) val;
418    w = (int) (val >>> 32);
419    if (w == 0) {
420      if (-1 == unsignedCmp(y, 16777216L)) {
421        dst.put((byte) 250).put((byte) (y >>> 16)).put((byte) (y >>> 8)).put((byte) y);
422        len = dst.getPosition() - start;
423        ord.apply(a, offset + start, len);
424        return len;
425      }
426      dst.put((byte) 251);
427      putUint32(dst, y);
428      len = dst.getPosition() - start;
429      ord.apply(a, offset + start, len);
430      return len;
431    }
432    if (-1 == unsignedCmp(w, 256L)) {
433      dst.put((byte) 252).put((byte) w);
434      putUint32(dst, y);
435      len = dst.getPosition() - start;
436      ord.apply(a, offset + start, len);
437      return len;
438    }
439    if (-1 == unsignedCmp(w, 65536L)) {
440      dst.put((byte) 253).put((byte) (w >>> 8)).put((byte) w);
441      putUint32(dst, y);
442      len = dst.getPosition() - start;
443      ord.apply(a, offset + start, len);
444      return len;
445    }
446    if (-1 == unsignedCmp(w, 16777216L)) {
447      dst.put((byte) 254).put((byte) (w >>> 16)).put((byte) (w >>> 8)).put((byte) w);
448      putUint32(dst, y);
449      len = dst.getPosition() - start;
450      ord.apply(a, offset + start, len);
451      return len;
452    }
453    dst.put((byte) 255);
454    putUint32(dst, w);
455    putUint32(dst, y);
456    len = dst.getPosition() - start;
457    ord.apply(a, offset + start, len);
458    return len;
459  }
460
461  /**
462   * Inspect {@code src} for an encoded varuint64 for its length in bytes. Preserves the state of
463   * {@code src}.
464   * @param src  source buffer
465   * @param comp if true, parse the compliment of the value.
466   * @return the number of bytes consumed by this value.
467   */
468  @InterfaceAudience.Private
469  static int lengthVaruint64(PositionedByteRange src, boolean comp) {
470    int a0 = (comp ? DESCENDING : ASCENDING).apply(src.peek()) & 0xff;
471    if (a0 <= 240) return 1;
472    if (a0 <= 248) return 2;
473    if (a0 == 249) return 3;
474    if (a0 == 250) return 4;
475    if (a0 == 251) return 5;
476    if (a0 == 252) return 6;
477    if (a0 == 253) return 7;
478    if (a0 == 254) return 8;
479    if (a0 == 255) return 9;
480    throw unexpectedHeader(src.peek());
481  }
482
483  /**
484   * Skip {@code src} over the encoded varuint64.
485   * @param src source buffer
486   * @param cmp if true, parse the compliment of the value.
487   * @return the number of bytes skipped.
488   */
489  @InterfaceAudience.Private
490  static int skipVaruint64(PositionedByteRange src, boolean cmp) {
491    final int len = lengthVaruint64(src, cmp);
492    src.setPosition(src.getPosition() + len);
493    return len;
494  }
495
496  /**
497   * Decode a sequence of bytes in {@code src} as a varuint64. Compliment the encoded value when
498   * {@code comp} is true.
499   * @return the decoded value.
500   */
501  @InterfaceAudience.Private
502  static long getVaruint64(PositionedByteRange src, boolean comp) {
503    assert src.getRemaining() >= lengthVaruint64(src, comp);
504    final long ret;
505    Order ord = comp ? DESCENDING : ASCENDING;
506    byte x = src.get();
507    final int a0 = ord.apply(x) & 0xff, a1, a2, a3, a4, a5, a6, a7, a8;
508    if (-1 == unsignedCmp(a0, 241)) {
509      return a0;
510    }
511    x = src.get();
512    a1 = ord.apply(x) & 0xff;
513    if (-1 == unsignedCmp(a0, 249)) {
514      return (a0 - 241L) * 256 + a1 + 240;
515    }
516    x = src.get();
517    a2 = ord.apply(x) & 0xff;
518    if (a0 == 249) {
519      return 2288L + 256 * a1 + a2;
520    }
521    x = src.get();
522    a3 = ord.apply(x) & 0xff;
523    if (a0 == 250) {
524      return ((long) a1 << 16L) | (a2 << 8) | a3;
525    }
526    x = src.get();
527    a4 = ord.apply(x) & 0xff;
528    ret = (((long) a1) << 24) | (a2 << 16) | (a3 << 8) | a4;
529    if (a0 == 251) {
530      return ret;
531    }
532    x = src.get();
533    a5 = ord.apply(x) & 0xff;
534    if (a0 == 252) {
535      return (ret << 8) | a5;
536    }
537    x = src.get();
538    a6 = ord.apply(x) & 0xff;
539    if (a0 == 253) {
540      return (ret << 16) | (a5 << 8) | a6;
541    }
542    x = src.get();
543    a7 = ord.apply(x) & 0xff;
544    if (a0 == 254) {
545      return (ret << 24) | (a5 << 16) | (a6 << 8) | a7;
546    }
547    x = src.get();
548    a8 = ord.apply(x) & 0xff;
549    return (ret << 32) | (((long) a5) << 24) | (a6 << 16) | (a7 << 8) | a8;
550  }
551
552  /**
553   * Strip all trailing zeros to ensure that no digit will be zero and round using our default
554   * context to ensure precision doesn't exceed max allowed. From Phoenix's {@code NumberUtil}.
555   * @return new {@link BigDecimal} instance
556   */
557  @InterfaceAudience.Private
558  static BigDecimal normalize(BigDecimal val) {
559    return null == val ? null : val.stripTrailingZeros().round(DEFAULT_MATH_CONTEXT);
560  }
561
562  /**
563   * Read significand digits from {@code src} according to the magnitude of {@code e}.
564   * @param src  The source from which to read encoded digits.
565   * @param e    The magnitude of the first digit read.
566   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
567   * @return The decoded value.
568   * @throws IllegalArgumentException when read exceeds the remaining length of {@code src}.
569   */
570  private static BigDecimal decodeSignificand(PositionedByteRange src, int e, boolean comp) {
571    // TODO: can this be made faster?
572    byte[] a = src.getBytes();
573    final int start = src.getPosition(), offset = src.getOffset(), remaining = src.getRemaining();
574    Order ord = comp ? DESCENDING : ASCENDING;
575    BigDecimal m;
576    StringBuilder sb = new StringBuilder();
577    for (int i = 0;; i++) {
578      if (i > remaining) {
579        // we've exceeded this range's window
580        src.setPosition(start);
581        throw new IllegalArgumentException(
582          "Read exceeds range before termination byte found. offset: " + offset + " position: "
583            + (start + i));
584      }
585      // one byte -> 2 digits
586      // base-100 digits are encoded as val * 2 + 1 except for the termination digit.
587      int twoDigits = (ord.apply(a[offset + start + i]) & 0xff) / 2;
588      sb.append(String.format("%02d", twoDigits));
589      // detect termination digit
590      // Besides, as we will normalise the return value at last,
591      // we only need to decode at most MAX_PRECISION + 2 digits here.
592      if ((ord.apply(a[offset + start + i]) & 1) == 0 || sb.length() > MAX_PRECISION + 1) {
593        src.setPosition(start + i + 1);
594        break;
595      }
596    }
597    m = new BigDecimal(sb.toString());
598    int stepsMoveLeft = sb.charAt(0) != '0' ? m.precision() : m.precision() + 1;
599    stepsMoveLeft -= e * 2;
600
601    return normalize(m.movePointLeft(stepsMoveLeft));
602  }
603
604  /**
605   * Skip {@code src} over the significand bytes.
606   * @param src  The source from which to read encoded digits.
607   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
608   * @return the number of bytes skipped.
609   */
610  private static int skipSignificand(PositionedByteRange src, boolean comp) {
611    byte[] a = src.getBytes();
612    final int offset = src.getOffset(), start = src.getPosition();
613    int i = src.getPosition();
614    while (((comp ? DESCENDING : ASCENDING).apply(a[offset + i++]) & 1) != 0)
615      ;
616    src.setPosition(i);
617    return i - start;
618  }
619
620  /**
621   * <p>
622   * Encode the small magnitude floating point number {@code val} using the key encoding. The caller
623   * guarantees that 1.0 > abs(val) > 0.0.
624   * </p>
625   * <p>
626   * A floating point value is encoded as an integer exponent {@code E} and a mantissa {@code M}.
627   * The original value is equal to {@code (M * 100^E)}. {@code E} is set to the smallest value
628   * possible without making {@code M} greater than or equal to 1.0.
629   * </p>
630   * <p>
631   * For this routine, {@code E} will always be zero or negative, since the original value is less
632   * than one. The encoding written by this routine is the ones-complement of the varint of the
633   * negative of {@code E} followed by the mantissa:
634   *
635   * <pre>
636   *   Encoding:   ~-E  M
637   * </pre>
638   * </p>
639   * @param dst The destination to which encoded digits are written.
640   * @param val The value to encode.
641   * @return the number of bytes written.
642   */
643  private static int encodeNumericSmall(PositionedByteRange dst, BigDecimal val) {
644    // TODO: this can be done faster?
645    // assert 1.0 > abs(val) > 0.0
646    BigDecimal abs = val.abs();
647    assert BigDecimal.ZERO.compareTo(abs) < 0 && BigDecimal.ONE.compareTo(abs) > 0;
648    byte[] a = dst.getBytes();
649    boolean isNeg = val.signum() == -1;
650    final int offset = dst.getOffset(), start = dst.getPosition();
651
652    if (isNeg) { /* Small negative number: 0x14, -E, ~M */
653      dst.put(NEG_SMALL);
654    } else { /* Small positive number: 0x16, ~-E, M */
655      dst.put(POS_SMALL);
656    }
657
658    // normalize abs(val) to determine E
659    int zerosBeforeFirstNonZero = abs.scale() - abs.precision();
660    int lengthToMoveRight =
661      zerosBeforeFirstNonZero % 2 == 0 ? zerosBeforeFirstNonZero : zerosBeforeFirstNonZero - 1;
662    int e = lengthToMoveRight / 2;
663    abs = abs.movePointRight(lengthToMoveRight);
664
665    putVaruint64(dst, e, !isNeg); // encode appropriate E value.
666
667    // encode M by peeling off centimal digits, encoding x as 2x+1
668    int startM = dst.getPosition();
669    encodeToCentimal(dst, abs);
670    // terminal digit should be 2x
671    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
672    if (isNeg) {
673      // negative values encoded as ~M
674      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
675    }
676    return dst.getPosition() - start;
677  }
678
679  /**
680   * Encode the large magnitude floating point number {@code val} using the key encoding. The caller
681   * guarantees that {@code val} will be finite and abs(val) >= 1.0.
682   * <p>
683   * A floating point value is encoded as an integer exponent {@code E} and a mantissa {@code M}.
684   * The original value is equal to {@code (M * 100^E)}. {@code E} is set to the smallest value
685   * possible without making {@code M} greater than or equal to 1.0.
686   * </p>
687   * <p>
688   * Each centimal digit of the mantissa is stored in a byte. If the value of the centimal digit is
689   * {@code X} (hence {@code X>=0} and {@code X<=99}) then the byte value will be {@code 2*X+1} for
690   * every byte of the mantissa, except for the last byte which will be {@code 2*X+0}. The mantissa
691   * must be the minimum number of bytes necessary to represent the value; trailing {@code X==0}
692   * digits are omitted. This means that the mantissa will never contain a byte with the value
693   * {@code 0x00}.
694   * </p>
695   * <p>
696   * If {@code E > 10}, then this routine writes of {@code E} as a varint followed by the mantissa
697   * as described above. Otherwise, if {@code E <= 10}, this routine only writes the mantissa and
698   * leaves the {@code E} value to be encoded as part of the opening byte of the field by the
699   * calling function.
700   *
701   * <pre>
702   *   Encoding:  M       (if E&lt;=10)
703   *              E M     (if E&gt;10)
704   * </pre>
705   * </p>
706   * @param dst The destination to which encoded digits are written.
707   * @param val The value to encode.
708   * @return the number of bytes written.
709   */
710  private static int encodeNumericLarge(PositionedByteRange dst, BigDecimal val) {
711    // TODO: this can be done faster
712    BigDecimal abs = val.abs();
713    byte[] a = dst.getBytes();
714    boolean isNeg = val.signum() == -1;
715    final int start = dst.getPosition(), offset = dst.getOffset();
716
717    if (isNeg) { /* Large negative number: 0x08, ~E, ~M */
718      dst.put(NEG_LARGE);
719    } else { /* Large positive number: 0x22, E, M */
720      dst.put(POS_LARGE);
721    }
722
723    // normalize abs(val) to determine E
724    int integerDigits = abs.precision() - abs.scale();
725    int lengthToMoveLeft = integerDigits % 2 == 0 ? integerDigits : integerDigits + 1;
726    int e = lengthToMoveLeft / 2;
727    abs = abs.movePointLeft(lengthToMoveLeft);
728
729    // encode appropriate header byte and/or E value.
730    if (e > 10) { /* large number, write out {~,}E */
731      putVaruint64(dst, e, isNeg);
732    } else {
733      if (isNeg) { /* Medium negative number: 0x13-E, ~M */
734        dst.put(start, (byte) (NEG_MED_MAX - e));
735      } else { /* Medium positive number: 0x17+E, M */
736        dst.put(start, (byte) (POS_MED_MIN + e));
737      }
738    }
739
740    // encode M by peeling off centimal digits, encoding x as 2x+1
741    int startM = dst.getPosition();
742    encodeToCentimal(dst, abs);
743    // terminal digit should be 2x
744    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
745    if (isNeg) {
746      // negative values encoded as ~M
747      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
748    }
749    return dst.getPosition() - start;
750  }
751
752  /**
753   * Encode a value val in [0.01, 1.0) into Centimals. Util function for
754   * {@link OrderedBytes#encodeNumericLarge(PositionedByteRange, BigDecimal)} and
755   * {@link OrderedBytes#encodeNumericSmall(PositionedByteRange, BigDecimal)}
756   * @param dst The destination to which encoded digits are written.
757   * @param val A BigDecimal after the normalization. The value must be in [0.01, 1.0).
758   */
759  private static void encodeToCentimal(PositionedByteRange dst, BigDecimal val) {
760    // The input value val must be in [0.01, 1.0)
761    String stringOfAbs = val.stripTrailingZeros().toPlainString();
762    String value = stringOfAbs.substring(stringOfAbs.indexOf('.') + 1);
763    int d;
764
765    // If the first float digit is 0, we will encode one digit more than MAX_PRECISION
766    // We encode at most MAX_PRECISION significant digits into centimals,
767    // because the input value, has been already normalized.
768    int maxPrecision = value.charAt(0) == '0' ? MAX_PRECISION + 1 : MAX_PRECISION;
769    maxPrecision = Math.min(maxPrecision, value.length());
770    for (int i = 0; i < maxPrecision; i += 2) {
771      d = (value.charAt(i) - '0') * 10;
772      if (i + 1 < maxPrecision) {
773        d += (value.charAt(i + 1) - '0');
774      }
775      dst.put((byte) (2 * d + 1));
776    }
777  }
778
779  /**
780   * Encode a numerical value using the variable-length encoding.
781   * @param dst The destination to which encoded digits are written.
782   * @param val The value to encode.
783   * @param ord The {@link Order} to respect while encoding {@code val}.
784   * @return the number of bytes written.
785   */
786  public static int encodeNumeric(PositionedByteRange dst, long val, Order ord) {
787    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
788  }
789
790  /**
791   * Encode a numerical value using the variable-length encoding.
792   * @param dst The destination to which encoded digits are written.
793   * @param val The value to encode.
794   * @param ord The {@link Order} to respect while encoding {@code val}.
795   * @return the number of bytes written.
796   */
797  public static int encodeNumeric(PositionedByteRange dst, double val, Order ord) {
798    if (val == 0.0) {
799      dst.put(ord.apply(ZERO));
800      return 1;
801    }
802    if (Double.isNaN(val)) {
803      dst.put(ord.apply(NAN));
804      return 1;
805    }
806    if (val == Double.NEGATIVE_INFINITY) {
807      dst.put(ord.apply(NEG_INF));
808      return 1;
809    }
810    if (val == Double.POSITIVE_INFINITY) {
811      dst.put(ord.apply(POS_INF));
812      return 1;
813    }
814    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
815  }
816
817  /**
818   * Encode a numerical value using the variable-length encoding. If the number of significant
819   * digits of the value exceeds the {@link OrderedBytes#MAX_PRECISION}, the exceeding part will be
820   * lost.
821   * @param dst The destination to which encoded digits are written.
822   * @param val The value to encode.
823   * @param ord The {@link Order} to respect while encoding {@code val}.
824   * @return the number of bytes written.
825   */
826  public static int encodeNumeric(PositionedByteRange dst, BigDecimal val, Order ord) {
827    final int len, offset = dst.getOffset(), start = dst.getPosition();
828    if (null == val) {
829      return encodeNull(dst, ord);
830    } else if (BigDecimal.ZERO.compareTo(val) == 0) {
831      dst.put(ord.apply(ZERO));
832      return 1;
833    }
834    BigDecimal abs = val.abs();
835    if (BigDecimal.ONE.compareTo(abs) <= 0) { // abs(v) >= 1.0
836      len = encodeNumericLarge(dst, normalize(val));
837    } else { // 1.0 > abs(v) >= 0.0
838      len = encodeNumericSmall(dst, normalize(val));
839    }
840    ord.apply(dst.getBytes(), offset + start, len);
841    return len;
842  }
843
844  /**
845   * Decode a {@link BigDecimal} from {@code src}. Assumes {@code src} encodes a value in Numeric
846   * encoding and is within the valid range of {@link BigDecimal} values. {@link BigDecimal} does
847   * not support {@code NaN} or {@code Infinte} values.
848   * @see #decodeNumericAsDouble(PositionedByteRange)
849   */
850  private static BigDecimal decodeNumericValue(PositionedByteRange src) {
851    final int e;
852    byte header = src.get();
853    boolean dsc = -1 == Integer.signum(header);
854    header = dsc ? DESCENDING.apply(header) : header;
855
856    if (header == NULL) return null;
857    if (header == NEG_LARGE) { /* Large negative number: 0x08, ~E, ~M */
858      e = (int) getVaruint64(src, !dsc);
859      return decodeSignificand(src, e, !dsc).negate();
860    }
861    if (header >= NEG_MED_MIN && header <= NEG_MED_MAX) {
862      /* Medium negative number: 0x13-E, ~M */
863      e = NEG_MED_MAX - header;
864      return decodeSignificand(src, e, !dsc).negate();
865    }
866    if (header == NEG_SMALL) { /* Small negative number: 0x14, -E, ~M */
867      e = (int) -getVaruint64(src, dsc);
868      return decodeSignificand(src, e, !dsc).negate();
869    }
870    if (header == ZERO) {
871      return BigDecimal.ZERO;
872    }
873    if (header == POS_SMALL) { /* Small positive number: 0x16, ~-E, M */
874      e = (int) -getVaruint64(src, !dsc);
875      return decodeSignificand(src, e, dsc);
876    }
877    if (header >= POS_MED_MIN && header <= POS_MED_MAX) {
878      /* Medium positive number: 0x17+E, M */
879      e = header - POS_MED_MIN;
880      return decodeSignificand(src, e, dsc);
881    }
882    if (header == POS_LARGE) { /* Large positive number: 0x22, E, M */
883      e = (int) getVaruint64(src, dsc);
884      return decodeSignificand(src, e, dsc);
885    }
886    throw unexpectedHeader(header);
887  }
888
889  /**
890   * Decode a primitive {@code double} value from the Numeric encoding. Numeric encoding is based on
891   * {@link BigDecimal}; in the event the encoded value is larger than can be represented in a
892   * {@code double}, this method performs an implicit narrowing conversion as described in
893   * {@link BigDecimal#doubleValue()}.
894   * @throws NullPointerException     when the encoded value is {@code NULL}.
895   * @throws IllegalArgumentException when the encoded value is not a Numeric.
896   * @see #encodeNumeric(PositionedByteRange, double, Order)
897   * @see BigDecimal#doubleValue()
898   */
899  public static double decodeNumericAsDouble(PositionedByteRange src) {
900    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
901    if (isNull(src)) {
902      throw new NullPointerException("A null value cannot be decoded to a double.");
903    }
904    if (isNumericNaN(src)) {
905      src.get();
906      return Double.NaN;
907    }
908    if (isNumericZero(src)) {
909      src.get();
910      return Double.valueOf(0.0);
911    }
912
913    byte header = -1 == Integer.signum(src.peek()) ? DESCENDING.apply(src.peek()) : src.peek();
914
915    if (header == NEG_INF) {
916      src.get();
917      return Double.NEGATIVE_INFINITY;
918    } else if (header == POS_INF) {
919      src.get();
920      return Double.POSITIVE_INFINITY;
921    } else {
922      return decodeNumericValue(src).doubleValue();
923    }
924  }
925
926  /**
927   * Decode a primitive {@code long} value from the Numeric encoding. Numeric encoding is based on
928   * {@link BigDecimal}; in the event the encoded value is larger than can be represented in a
929   * {@code long}, this method performs an implicit narrowing conversion as described in
930   * {@link BigDecimal#doubleValue()}.
931   * @throws NullPointerException     when the encoded value is {@code NULL}.
932   * @throws IllegalArgumentException when the encoded value is not a Numeric.
933   * @see #encodeNumeric(PositionedByteRange, long, Order)
934   * @see BigDecimal#longValue()
935   */
936  public static long decodeNumericAsLong(PositionedByteRange src) {
937    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
938    if (isNull(src)) throw new NullPointerException();
939    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
940    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
941    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
942
943    if (isNumericZero(src)) {
944      src.get();
945      return Long.valueOf(0);
946    }
947    return decodeNumericValue(src).longValue();
948  }
949
950  /**
951   * Decode a {@link BigDecimal} value from the variable-length encoding.
952   * @throws IllegalArgumentException when the encoded value is not a Numeric.
953   * @see #encodeNumeric(PositionedByteRange, BigDecimal, Order)
954   */
955  public static BigDecimal decodeNumericAsBigDecimal(PositionedByteRange src) {
956    if (isNull(src)) {
957      src.get();
958      return null;
959    }
960    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
961    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
962    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
963    return decodeNumericValue(src);
964  }
965
966  /**
967   * Encode a String value. String encoding is 0x00-terminated and so it does not support
968   * {@code \u0000} codepoints in the value.
969   * @param dst The destination to which the encoded value is written.
970   * @param val The value to encode.
971   * @param ord The {@link Order} to respect while encoding {@code val}.
972   * @return the number of bytes written.
973   * @throws IllegalArgumentException when {@code val} contains a {@code \u0000}.
974   */
975  public static int encodeString(PositionedByteRange dst, String val, Order ord) {
976    if (null == val) {
977      return encodeNull(dst, ord);
978    }
979    if (val.contains("\u0000"))
980      throw new IllegalArgumentException("Cannot encode String values containing '\\u0000'");
981    final int offset = dst.getOffset(), start = dst.getPosition();
982    dst.put(TEXT);
983    // TODO: is there no way to decode into dst directly?
984    dst.put(val.getBytes(UTF8));
985    dst.put(TERM);
986    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
987    return dst.getPosition() - start;
988  }
989
990  /**
991   * Decode a String value.
992   */
993  public static String decodeString(PositionedByteRange src) {
994    final byte header = src.get();
995    if (header == NULL || header == DESCENDING.apply(NULL)) return null;
996    assert header == TEXT || header == DESCENDING.apply(TEXT);
997    Order ord = header == TEXT ? ASCENDING : DESCENDING;
998    byte[] a = src.getBytes();
999    final int offset = src.getOffset(), start = src.getPosition();
1000    final byte terminator = ord.apply(TERM);
1001    int rawStartPos = offset + start, rawTermPos = rawStartPos;
1002    for (; a[rawTermPos] != terminator; rawTermPos++)
1003      ;
1004    src.setPosition(rawTermPos - offset + 1); // advance position to TERM + 1
1005    if (DESCENDING == ord) {
1006      // make a copy so that we don't disturb encoded value with ord.
1007      byte[] copy = new byte[rawTermPos - rawStartPos];
1008      System.arraycopy(a, rawStartPos, copy, 0, copy.length);
1009      ord.apply(copy);
1010      return new String(copy, UTF8);
1011    } else {
1012      return new String(a, rawStartPos, rawTermPos - rawStartPos, UTF8);
1013    }
1014  }
1015
1016  /**
1017   * Calculate the expected BlobVar encoded length based on unencoded length.
1018   */
1019  public static int blobVarEncodedLength(int len) {
1020    if (0 == len) return 2; // 1-byte header + 1-byte terminator
1021    else return (int) Math.ceil((len * 8) // 8-bits per input byte
1022      / 7.0) // 7-bits of input data per encoded byte, rounded up
1023      + 1; // + 1-byte header
1024  }
1025
1026  /**
1027   * Calculate the expected BlobVar decoded length based on encoded length.
1028   */
1029  @InterfaceAudience.Private
1030  static int blobVarDecodedLength(int len) {
1031    return ((len - 1) // 1-byte header
1032      * 7) // 7-bits of payload per encoded byte
1033      / 8; // 8-bits per byte
1034  }
1035
1036  /**
1037   * Encode a Blob value using a modified varint encoding scheme.
1038   * <p>
1039   * This format encodes a byte[] value such that no limitations on the input value are imposed. The
1040   * first byte encodes the encoding scheme that follows, {@link #BLOB_VAR}. Each encoded byte
1041   * thereafter consists of a header bit followed by 7 bits of payload. A header bit of '1'
1042   * indicates continuation of the encoding. A header bit of '0' indicates this byte contains the
1043   * last of the payload. An empty input value is encoded as the header byte immediately followed by
1044   * a termination byte {@code 0x00}. This is not ambiguous with the encoded value of {@code []},
1045   * which results in {@code [0x80, 0x00]}.
1046   * </p>
1047   * @return the number of bytes written.
1048   */
1049  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, int voff, int vlen,
1050    Order ord) {
1051    if (null == val) {
1052      return encodeNull(dst, ord);
1053    }
1054    // Empty value is null-terminated. All other values are encoded as 7-bits per byte.
1055    assert dst.getRemaining() >= blobVarEncodedLength(vlen) : "buffer overflow expected.";
1056    final int offset = dst.getOffset(), start = dst.getPosition();
1057    dst.put(BLOB_VAR);
1058    if (0 == vlen) {
1059      dst.put(TERM);
1060    } else {
1061      byte s = 1, t = 0;
1062      for (int i = voff; i < vlen; i++) {
1063        dst.put((byte) (0x80 | t | ((val[i] & 0xff) >>> s)));
1064        if (s < 7) {
1065          t = (byte) (val[i] << (7 - s));
1066          s++;
1067        } else {
1068          dst.put((byte) (0x80 | val[i]));
1069          s = 1;
1070          t = 0;
1071        }
1072      }
1073      if (s > 1) {
1074        dst.put((byte) (0x7f & t));
1075      } else {
1076        dst.getBytes()[offset + dst.getPosition() - 1] =
1077          (byte) (dst.getBytes()[offset + dst.getPosition() - 1] & 0x7f);
1078      }
1079    }
1080    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1081    return dst.getPosition() - start;
1082  }
1083
1084  /**
1085   * Encode a blob value using a modified varint encoding scheme.
1086   * @return the number of bytes written.
1087   * @see #encodeBlobVar(PositionedByteRange, byte[], int, int, Order)
1088   */
1089  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, Order ord) {
1090    return encodeBlobVar(dst, val, 0, null != val ? val.length : 0, ord);
1091  }
1092
1093  /**
1094   * Decode a blob value that was encoded using BlobVar encoding.
1095   */
1096  public static byte[] decodeBlobVar(PositionedByteRange src) {
1097    final byte header = src.get();
1098    if (header == NULL || header == DESCENDING.apply(NULL)) {
1099      return null;
1100    }
1101    assert header == BLOB_VAR || header == DESCENDING.apply(BLOB_VAR);
1102    Order ord = BLOB_VAR == header ? ASCENDING : DESCENDING;
1103    if (src.peek() == ord.apply(TERM)) {
1104      // skip empty input buffer.
1105      src.get();
1106      return new byte[0];
1107    }
1108    final int offset = src.getOffset(), start = src.getPosition();
1109    int end;
1110    byte[] a = src.getBytes();
1111    for (end = start; (byte) (ord.apply(a[offset + end]) & 0x80) != TERM; end++)
1112      ;
1113    end++; // increment end to 1-past last byte
1114    // create ret buffer using length of encoded data + 1 (header byte)
1115    PositionedByteRange ret =
1116      new SimplePositionedMutableByteRange(blobVarDecodedLength(end - start + 1));
1117    int s = 6;
1118    byte t = (byte) ((ord.apply(a[offset + start]) << 1) & 0xff);
1119    for (int i = start + 1; i < end; i++) {
1120      if (s == 7) {
1121        ret.put((byte) (t | (ord.apply(a[offset + i]) & 0x7f)));
1122        i++;
1123        // explicitly reset t -- clean up overflow buffer after decoding
1124        // a full cycle and retain assertion condition below. This happens
1125        t = 0; // when the LSB in the last encoded byte is 1. (HBASE-9893)
1126      } else {
1127        ret.put((byte) (t | ((ord.apply(a[offset + i]) & 0x7f) >>> s)));
1128      }
1129      if (i == end) break;
1130      t = (byte) ((ord.apply(a[offset + i]) << (8 - s)) & 0xff);
1131      s = s == 1 ? 7 : s - 1;
1132    }
1133    src.setPosition(end);
1134    assert t == 0 : "Unexpected bits remaining after decoding blob.";
1135    assert ret.getPosition() == ret.getLength() : "Allocated unnecessarily large return buffer.";
1136    return ret.getBytes();
1137  }
1138
1139  /**
1140   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in DESCENDING order is NULL
1141   * terminated so as to preserve proper sorting of {@code []} and so it does not support
1142   * {@code 0x00} in the value.
1143   * @return the number of bytes written.
1144   * @throws IllegalArgumentException when {@code ord} is DESCENDING and {@code val} contains a
1145   *                                  {@code 0x00} byte.
1146   */
1147  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, int voff, int vlen,
1148    Order ord) {
1149    if (null == val) {
1150      encodeNull(dst, ord);
1151      if (ASCENDING == ord) return 1;
1152      else {
1153        // DESCENDING ordered BlobCopy requires a termination bit to preserve
1154        // sort-order semantics of null values.
1155        dst.put(ord.apply(TERM));
1156        return 2;
1157      }
1158    }
1159    // Blobs as final entry in a compound key are written unencoded.
1160    assert dst.getRemaining() >= vlen + (ASCENDING == ord ? 1 : 2);
1161    if (DESCENDING == ord) {
1162      for (int i = 0; i < vlen; i++) {
1163        if (TERM == val[voff + i]) {
1164          throw new IllegalArgumentException("0x00 bytes not permitted in value.");
1165        }
1166      }
1167    }
1168    final int offset = dst.getOffset(), start = dst.getPosition();
1169    dst.put(BLOB_COPY);
1170    dst.put(val, voff, vlen);
1171    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1172    // sort-order semantics of null values.
1173    if (DESCENDING == ord) dst.put(TERM);
1174    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1175    return dst.getPosition() - start;
1176  }
1177
1178  /**
1179   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in DESCENDING order is NULL
1180   * terminated so as to preserve proper sorting of {@code []} and so it does not support
1181   * {@code 0x00} in the value.
1182   * @return the number of bytes written.
1183   * @throws IllegalArgumentException when {@code ord} is DESCENDING and {@code val} contains a
1184   *                                  {@code 0x00} byte.
1185   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1186   */
1187  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, Order ord) {
1188    return encodeBlobCopy(dst, val, 0, null != val ? val.length : 0, ord);
1189  }
1190
1191  /**
1192   * Decode a Blob value, byte-for-byte copy.
1193   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1194   */
1195  public static byte[] decodeBlobCopy(PositionedByteRange src) {
1196    byte header = src.get();
1197    if (header == NULL || header == DESCENDING.apply(NULL)) {
1198      return null;
1199    }
1200    assert header == BLOB_COPY || header == DESCENDING.apply(BLOB_COPY);
1201    Order ord = header == BLOB_COPY ? ASCENDING : DESCENDING;
1202    final int length = src.getRemaining() - (ASCENDING == ord ? 0 : 1);
1203    byte[] ret = new byte[length];
1204    src.get(ret);
1205    ord.apply(ret, 0, ret.length);
1206    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1207    // sort-order semantics of null values.
1208    if (DESCENDING == ord) src.get();
1209    return ret;
1210  }
1211
1212  /**
1213   * Encode a null value.
1214   * @param dst The destination to which encoded digits are written.
1215   * @param ord The {@link Order} to respect while encoding {@code val}.
1216   * @return the number of bytes written.
1217   */
1218  public static int encodeNull(PositionedByteRange dst, Order ord) {
1219    dst.put(ord.apply(NULL));
1220    return 1;
1221  }
1222
1223  /**
1224   * Encode an {@code int8} value using the fixed-length encoding.
1225   * @return the number of bytes written.
1226   * @see #encodeInt64(PositionedByteRange, long, Order)
1227   * @see #decodeInt8(PositionedByteRange)
1228   */
1229  public static int encodeInt8(PositionedByteRange dst, byte val, Order ord) {
1230    final int offset = dst.getOffset(), start = dst.getPosition();
1231    dst.put(FIXED_INT8).put((byte) (val ^ 0x80));
1232    ord.apply(dst.getBytes(), offset + start, 2);
1233    return 2;
1234  }
1235
1236  /**
1237   * Decode an {@code int8} value.
1238   * @see #encodeInt8(PositionedByteRange, byte, Order)
1239   */
1240  public static byte decodeInt8(PositionedByteRange src) {
1241    final byte header = src.get();
1242    assert header == FIXED_INT8 || header == DESCENDING.apply(FIXED_INT8);
1243    Order ord = header == FIXED_INT8 ? ASCENDING : DESCENDING;
1244    return (byte) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1245  }
1246
1247  /**
1248   * Encode an {@code int16} value using the fixed-length encoding.
1249   * @return the number of bytes written.
1250   * @see #encodeInt64(PositionedByteRange, long, Order)
1251   * @see #decodeInt16(PositionedByteRange)
1252   */
1253  public static int encodeInt16(PositionedByteRange dst, short val, Order ord) {
1254    final int offset = dst.getOffset(), start = dst.getPosition();
1255    dst.put(FIXED_INT16).put((byte) ((val >> 8) ^ 0x80)).put((byte) val);
1256    ord.apply(dst.getBytes(), offset + start, 3);
1257    return 3;
1258  }
1259
1260  /**
1261   * Decode an {@code int16} value.
1262   * @see #encodeInt16(PositionedByteRange, short, Order)
1263   */
1264  public static short decodeInt16(PositionedByteRange src) {
1265    final byte header = src.get();
1266    assert header == FIXED_INT16 || header == DESCENDING.apply(FIXED_INT16);
1267    Order ord = header == FIXED_INT16 ? ASCENDING : DESCENDING;
1268    short val = (short) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1269    val = (short) ((val << 8) + (ord.apply(src.get()) & 0xff));
1270    return val;
1271  }
1272
1273  /**
1274   * Encode an {@code int32} value using the fixed-length encoding.
1275   * @return the number of bytes written.
1276   * @see #encodeInt64(PositionedByteRange, long, Order)
1277   * @see #decodeInt32(PositionedByteRange)
1278   */
1279  public static int encodeInt32(PositionedByteRange dst, int val, Order ord) {
1280    final int offset = dst.getOffset(), start = dst.getPosition();
1281    dst.put(FIXED_INT32).put((byte) ((val >> 24) ^ 0x80)).put((byte) (val >> 16))
1282      .put((byte) (val >> 8)).put((byte) val);
1283    ord.apply(dst.getBytes(), offset + start, 5);
1284    return 5;
1285  }
1286
1287  /**
1288   * Decode an {@code int32} value.
1289   * @see #encodeInt32(PositionedByteRange, int, Order)
1290   */
1291  public static int decodeInt32(PositionedByteRange src) {
1292    final byte header = src.get();
1293    assert header == FIXED_INT32 || header == DESCENDING.apply(FIXED_INT32);
1294    Order ord = header == FIXED_INT32 ? ASCENDING : DESCENDING;
1295    int val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1296    for (int i = 1; i < 4; i++) {
1297      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1298    }
1299    return val;
1300  }
1301
1302  /**
1303   * Encode an {@code int64} value using the fixed-length encoding.
1304   * <p>
1305   * This format ensures that all longs sort in their natural order, as they would sort when using
1306   * signed long comparison.
1307   * </p>
1308   * <p>
1309   * All Longs are serialized to an 8-byte, fixed-width sortable byte format. Serialization is
1310   * performed by inverting the integer sign bit and writing the resulting bytes to the byte array
1311   * in big endian order. The encoded value is prefixed by the {@link #FIXED_INT64} header byte.
1312   * This encoding is designed to handle java language primitives and so Null values are NOT
1313   * supported by this implementation.
1314   * </p>
1315   * <p>
1316   * For example:
1317   * </p>
1318   *
1319   * <pre>
1320   * Input:   0x0000000000000005 (5)
1321   * Result:  0x288000000000000005
1322   *
1323   * Input:   0xfffffffffffffffb (-4)
1324   * Result:  0x280000000000000004
1325   *
1326   * Input:   0x7fffffffffffffff (Long.MAX_VALUE)
1327   * Result:  0x28ffffffffffffffff
1328   *
1329   * Input:   0x8000000000000000 (Long.MIN_VALUE)
1330   * Result:  0x287fffffffffffffff
1331   * </pre>
1332   * <p>
1333   * This encoding format, and much of this documentation string, is based on Orderly's
1334   * {@code FixedIntWritableRowKey}.
1335   * </p>
1336   * @return the number of bytes written.
1337   * @see #decodeInt64(PositionedByteRange)
1338   */
1339  public static int encodeInt64(PositionedByteRange dst, long val, Order ord) {
1340    final int offset = dst.getOffset(), start = dst.getPosition();
1341    dst.put(FIXED_INT64).put((byte) ((val >> 56) ^ 0x80)).put((byte) (val >> 48))
1342      .put((byte) (val >> 40)).put((byte) (val >> 32)).put((byte) (val >> 24))
1343      .put((byte) (val >> 16)).put((byte) (val >> 8)).put((byte) val);
1344    ord.apply(dst.getBytes(), offset + start, 9);
1345    return 9;
1346  }
1347
1348  /**
1349   * Decode an {@code int64} value.
1350   * @see #encodeInt64(PositionedByteRange, long, Order)
1351   */
1352  public static long decodeInt64(PositionedByteRange src) {
1353    final byte header = src.get();
1354    assert header == FIXED_INT64 || header == DESCENDING.apply(FIXED_INT64);
1355    Order ord = header == FIXED_INT64 ? ASCENDING : DESCENDING;
1356    long val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1357    for (int i = 1; i < 8; i++) {
1358      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1359    }
1360    return val;
1361  }
1362
1363  /**
1364   * Encode a 32-bit floating point value using the fixed-length encoding. Encoding format is
1365   * described at length in {@link #encodeFloat64(PositionedByteRange, double, Order)}.
1366   * @return the number of bytes written.
1367   * @see #decodeFloat32(PositionedByteRange)
1368   * @see #encodeFloat64(PositionedByteRange, double, Order)
1369   */
1370  public static int encodeFloat32(PositionedByteRange dst, float val, Order ord) {
1371    final int offset = dst.getOffset(), start = dst.getPosition();
1372    int i = Float.floatToIntBits(val);
1373    i ^= ((i >> (Integer.SIZE - 1)) | Integer.MIN_VALUE);
1374    dst.put(FIXED_FLOAT32).put((byte) (i >> 24)).put((byte) (i >> 16)).put((byte) (i >> 8))
1375      .put((byte) i);
1376    ord.apply(dst.getBytes(), offset + start, 5);
1377    return 5;
1378  }
1379
1380  /**
1381   * Decode a 32-bit floating point value using the fixed-length encoding.
1382   * @see #encodeFloat32(PositionedByteRange, float, Order)
1383   */
1384  public static float decodeFloat32(PositionedByteRange src) {
1385    final byte header = src.get();
1386    assert header == FIXED_FLOAT32 || header == DESCENDING.apply(FIXED_FLOAT32);
1387    Order ord = header == FIXED_FLOAT32 ? ASCENDING : DESCENDING;
1388    int val = ord.apply(src.get()) & 0xff;
1389    for (int i = 1; i < 4; i++) {
1390      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1391    }
1392    val ^= (~val >> (Integer.SIZE - 1)) | Integer.MIN_VALUE;
1393    return Float.intBitsToFloat(val);
1394  }
1395
1396  /**
1397   * Encode a 64-bit floating point value using the fixed-length encoding.
1398   * <p>
1399   * This format ensures the following total ordering of floating point values:
1400   * Double.NEGATIVE_INFINITY &lt; -Double.MAX_VALUE &lt; ... &lt; -Double.MIN_VALUE &lt; -0.0 &lt;
1401   * +0.0; &lt; Double.MIN_VALUE &lt; ... &lt; Double.MAX_VALUE &lt; Double.POSITIVE_INFINITY &lt;
1402   * Double.NaN
1403   * </p>
1404   * <p>
1405   * Floating point numbers are encoded as specified in IEEE 754. A 64-bit double precision float
1406   * consists of a sign bit, 11-bit unsigned exponent encoded in offset-1023 notation, and a 52-bit
1407   * significand. The format is described further in the
1408   * <a href="http://en.wikipedia.org/wiki/Double_precision"> Double Precision Floating Point
1409   * Wikipedia page</a>
1410   * </p>
1411   * <p>
1412   * The value of a normal float is -1 <sup>sign bit</sup> &times; 2<sup>exponent - 1023</sup>
1413   * &times; 1.significand
1414   * </p>
1415   * <p>
1416   * The IEE754 floating point format already preserves sort ordering for positive floating point
1417   * numbers when the raw bytes are compared in most significant byte order. This is discussed
1418   * further at
1419   * <a href= "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm" >
1420   * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats. htm</a>
1421   * </p>
1422   * <p>
1423   * Thus, we need only ensure that negative numbers sort in the the exact opposite order as
1424   * positive numbers (so that say, negative infinity is less than negative 1), and that all
1425   * negative numbers compare less than any positive number. To accomplish this, we invert the sign
1426   * bit of all floating point numbers, and we also invert the exponent and significand bits if the
1427   * floating point number was negative.
1428   * </p>
1429   * <p>
1430   * More specifically, we first store the floating point bits into a 64-bit long {@code l} using
1431   * {@link Double#doubleToLongBits}. This method collapses all NaNs into a single, canonical NaN
1432   * value but otherwise leaves the bits unchanged. We then compute
1433   * </p>
1434   *
1435   * <pre>
1436   * l &circ;= (l &gt;&gt; (Long.SIZE - 1)) | Long.MIN_SIZE
1437   * </pre>
1438   * <p>
1439   * which inverts the sign bit and XOR's all other bits with the sign bit itself. Comparing the raw
1440   * bytes of {@code l} in most significant byte order is equivalent to performing a double
1441   * precision floating point comparison on the underlying bits (ignoring NaN comparisons, as NaNs
1442   * don't compare equal to anything when performing floating point comparisons).
1443   * </p>
1444   * <p>
1445   * The resulting long integer is then converted into a byte array by serializing the long one byte
1446   * at a time in most significant byte order. The serialized integer is prefixed by a single header
1447   * byte. All serialized values are 9 bytes in length.
1448   * </p>
1449   * <p>
1450   * This encoding format, and much of this highly detailed documentation string, is based on
1451   * Orderly's {@code DoubleWritableRowKey}.
1452   * </p>
1453   * @return the number of bytes written.
1454   * @see #decodeFloat64(PositionedByteRange)
1455   */
1456  public static int encodeFloat64(PositionedByteRange dst, double val, Order ord) {
1457    final int offset = dst.getOffset(), start = dst.getPosition();
1458    long lng = Double.doubleToLongBits(val);
1459    lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE);
1460    dst.put(FIXED_FLOAT64).put((byte) (lng >> 56)).put((byte) (lng >> 48)).put((byte) (lng >> 40))
1461      .put((byte) (lng >> 32)).put((byte) (lng >> 24)).put((byte) (lng >> 16))
1462      .put((byte) (lng >> 8)).put((byte) lng);
1463    ord.apply(dst.getBytes(), offset + start, 9);
1464    return 9;
1465  }
1466
1467  /**
1468   * Decode a 64-bit floating point value using the fixed-length encoding.
1469   * @see #encodeFloat64(PositionedByteRange, double, Order)
1470   */
1471  public static double decodeFloat64(PositionedByteRange src) {
1472    final byte header = src.get();
1473    assert header == FIXED_FLOAT64 || header == DESCENDING.apply(FIXED_FLOAT64);
1474    Order ord = header == FIXED_FLOAT64 ? ASCENDING : DESCENDING;
1475    long val = ord.apply(src.get()) & 0xff;
1476    for (int i = 1; i < 8; i++) {
1477      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1478    }
1479    val ^= (~val >> (Long.SIZE - 1)) | Long.MIN_VALUE;
1480    return Double.longBitsToDouble(val);
1481  }
1482
1483  /**
1484   * Returns true when {@code src} appears to be positioned an encoded value, false otherwise.
1485   */
1486  public static boolean isEncodedValue(PositionedByteRange src) {
1487    return isNull(src) || isNumeric(src) || isFixedInt8(src) || isFixedInt16(src)
1488      || isFixedInt32(src) || isFixedInt64(src) || isFixedFloat32(src) || isFixedFloat64(src)
1489      || isText(src) || isBlobCopy(src) || isBlobVar(src);
1490  }
1491
1492  /**
1493   * Return true when the next encoded value in {@code src} is null, false otherwise.
1494   */
1495  public static boolean isNull(PositionedByteRange src) {
1496    return NULL == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1497  }
1498
1499  /**
1500   * Return true when the next encoded value in {@code src} uses Numeric encoding, false otherwise.
1501   * {@code NaN}, {@code +/-Inf} are valid Numeric values.
1502   */
1503  public static boolean isNumeric(PositionedByteRange src) {
1504    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1505    return x >= NEG_INF && x <= NAN;
1506  }
1507
1508  /**
1509   * Return true when the next encoded value in {@code src} uses Numeric encoding and is
1510   * {@code Infinite}, false otherwise.
1511   */
1512  public static boolean isNumericInfinite(PositionedByteRange src) {
1513    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1514    return NEG_INF == x || POS_INF == x;
1515  }
1516
1517  /**
1518   * Return true when the next encoded value in {@code src} uses Numeric encoding and is
1519   * {@code NaN}, false otherwise.
1520   */
1521  public static boolean isNumericNaN(PositionedByteRange src) {
1522    return NAN == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1523  }
1524
1525  /**
1526   * Return true when the next encoded value in {@code src} uses Numeric encoding and is {@code 0},
1527   * false otherwise.
1528   */
1529  public static boolean isNumericZero(PositionedByteRange src) {
1530    return ZERO == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1531  }
1532
1533  /**
1534   * Return true when the next encoded value in {@code src} uses fixed-width Int8 encoding, false
1535   * otherwise.
1536   */
1537  public static boolean isFixedInt8(PositionedByteRange src) {
1538    return FIXED_INT8
1539        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1540  }
1541
1542  /**
1543   * Return true when the next encoded value in {@code src} uses fixed-width Int16 encoding, false
1544   * otherwise.
1545   */
1546  public static boolean isFixedInt16(PositionedByteRange src) {
1547    return FIXED_INT16
1548        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1549  }
1550
1551  /**
1552   * Return true when the next encoded value in {@code src} uses fixed-width Int32 encoding, false
1553   * otherwise.
1554   */
1555  public static boolean isFixedInt32(PositionedByteRange src) {
1556    return FIXED_INT32
1557        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1558  }
1559
1560  /**
1561   * Return true when the next encoded value in {@code src} uses fixed-width Int64 encoding, false
1562   * otherwise.
1563   */
1564  public static boolean isFixedInt64(PositionedByteRange src) {
1565    return FIXED_INT64
1566        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1567  }
1568
1569  /**
1570   * Return true when the next encoded value in {@code src} uses fixed-width Float32 encoding, false
1571   * otherwise.
1572   */
1573  public static boolean isFixedFloat32(PositionedByteRange src) {
1574    return FIXED_FLOAT32
1575        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1576  }
1577
1578  /**
1579   * Return true when the next encoded value in {@code src} uses fixed-width Float64 encoding, false
1580   * otherwise.
1581   */
1582  public static boolean isFixedFloat64(PositionedByteRange src) {
1583    return FIXED_FLOAT64
1584        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1585  }
1586
1587  /**
1588   * Return true when the next encoded value in {@code src} uses Text encoding, false otherwise.
1589   */
1590  public static boolean isText(PositionedByteRange src) {
1591    return TEXT == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1592  }
1593
1594  /**
1595   * Return true when the next encoded value in {@code src} uses BlobVar encoding, false otherwise.
1596   */
1597  public static boolean isBlobVar(PositionedByteRange src) {
1598    return BLOB_VAR
1599        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1600  }
1601
1602  /**
1603   * Return true when the next encoded value in {@code src} uses BlobCopy encoding, false otherwise.
1604   */
1605  public static boolean isBlobCopy(PositionedByteRange src) {
1606    return BLOB_COPY
1607        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1608  }
1609
1610  /**
1611   * Skip {@code buff}'s position forward over one encoded value.
1612   * @return number of bytes skipped.
1613   */
1614  public static int skip(PositionedByteRange src) {
1615    final int start = src.getPosition();
1616    byte header = src.get();
1617    Order ord = (-1 == Integer.signum(header)) ? DESCENDING : ASCENDING;
1618    header = ord.apply(header);
1619
1620    switch (header) {
1621      case NULL:
1622      case NEG_INF:
1623        return 1;
1624      case NEG_LARGE: /* Large negative number: 0x08, ~E, ~M */
1625        skipVaruint64(src, DESCENDING != ord);
1626        skipSignificand(src, DESCENDING != ord);
1627        return src.getPosition() - start;
1628      case NEG_MED_MIN: /* Medium negative number: 0x13-E, ~M */
1629      case NEG_MED_MIN + 0x01:
1630      case NEG_MED_MIN + 0x02:
1631      case NEG_MED_MIN + 0x03:
1632      case NEG_MED_MIN + 0x04:
1633      case NEG_MED_MIN + 0x05:
1634      case NEG_MED_MIN + 0x06:
1635      case NEG_MED_MIN + 0x07:
1636      case NEG_MED_MIN + 0x08:
1637      case NEG_MED_MIN + 0x09:
1638      case NEG_MED_MAX:
1639        skipSignificand(src, DESCENDING != ord);
1640        return src.getPosition() - start;
1641      case NEG_SMALL: /* Small negative number: 0x14, -E, ~M */
1642        skipVaruint64(src, DESCENDING == ord);
1643        skipSignificand(src, DESCENDING != ord);
1644        return src.getPosition() - start;
1645      case ZERO:
1646        return 1;
1647      case POS_SMALL: /* Small positive number: 0x16, ~-E, M */
1648        skipVaruint64(src, DESCENDING != ord);
1649        skipSignificand(src, DESCENDING == ord);
1650        return src.getPosition() - start;
1651      case POS_MED_MIN: /* Medium positive number: 0x17+E, M */
1652      case POS_MED_MIN + 0x01:
1653      case POS_MED_MIN + 0x02:
1654      case POS_MED_MIN + 0x03:
1655      case POS_MED_MIN + 0x04:
1656      case POS_MED_MIN + 0x05:
1657      case POS_MED_MIN + 0x06:
1658      case POS_MED_MIN + 0x07:
1659      case POS_MED_MIN + 0x08:
1660      case POS_MED_MIN + 0x09:
1661      case POS_MED_MAX:
1662        skipSignificand(src, DESCENDING == ord);
1663        return src.getPosition() - start;
1664      case POS_LARGE: /* Large positive number: 0x22, E, M */
1665        skipVaruint64(src, DESCENDING == ord);
1666        skipSignificand(src, DESCENDING == ord);
1667        return src.getPosition() - start;
1668      case POS_INF:
1669        return 1;
1670      case NAN:
1671        return 1;
1672      case FIXED_INT8:
1673        src.setPosition(src.getPosition() + 1);
1674        return src.getPosition() - start;
1675      case FIXED_INT16:
1676        src.setPosition(src.getPosition() + 2);
1677        return src.getPosition() - start;
1678      case FIXED_INT32:
1679        src.setPosition(src.getPosition() + 4);
1680        return src.getPosition() - start;
1681      case FIXED_INT64:
1682        src.setPosition(src.getPosition() + 8);
1683        return src.getPosition() - start;
1684      case FIXED_FLOAT32:
1685        src.setPosition(src.getPosition() + 4);
1686        return src.getPosition() - start;
1687      case FIXED_FLOAT64:
1688        src.setPosition(src.getPosition() + 8);
1689        return src.getPosition() - start;
1690      case TEXT:
1691        // for null-terminated values, skip to the end.
1692        do {
1693          header = ord.apply(src.get());
1694        } while (header != TERM);
1695        return src.getPosition() - start;
1696      case BLOB_VAR:
1697        // read until we find a 0 in the MSB
1698        do {
1699          header = ord.apply(src.get());
1700        } while ((byte) (header & 0x80) != TERM);
1701        return src.getPosition() - start;
1702      case BLOB_COPY:
1703        if (Order.DESCENDING == ord) {
1704          // if descending, read to termination byte.
1705          do {
1706            header = ord.apply(src.get());
1707          } while (header != TERM);
1708          return src.getPosition() - start;
1709        } else {
1710          // otherwise, just skip to the end.
1711          src.setPosition(src.getLength());
1712          return src.getPosition() - start;
1713        }
1714      default:
1715        throw unexpectedHeader(header);
1716    }
1717  }
1718
1719  /**
1720   * Return the number of encoded entries remaining in {@code buff}. The state of {@code buff} is
1721   * not modified through use of this method.
1722   */
1723  public static int length(PositionedByteRange buff) {
1724    PositionedByteRange b =
1725      new SimplePositionedMutableByteRange(buff.getBytes(), buff.getOffset(), buff.getLength());
1726    b.setPosition(buff.getPosition());
1727    int cnt = 0;
1728    for (; isEncodedValue(b); skip(b), cnt++)
1729      ;
1730    return cnt;
1731  }
1732}