port_posix.h 13.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// See port_example.h for documentation for the following types/functions.

12 13
#ifndef STORAGE_LEVELDB_PORT_PORT_POSIX_H_
#define STORAGE_LEVELDB_PORT_PORT_POSIX_H_
J
jorlow@chromium.org 已提交
14

H
heyongqiang 已提交
15
#undef PLATFORM_IS_LITTLE_ENDIAN
16
#if defined(OS_MACOSX)
17
  #include <machine/endian.h>
H
heyongqiang 已提交
18 19 20 21
  #if defined(__DARWIN_LITTLE_ENDIAN) && defined(__DARWIN_BYTE_ORDER)
    #define PLATFORM_IS_LITTLE_ENDIAN \
        (__DARWIN_BYTE_ORDER == __DARWIN_LITTLE_ENDIAN)
  #endif
22 23 24
#elif defined(OS_SOLARIS)
  #include <sys/isa_defs.h>
  #ifdef _LITTLE_ENDIAN
H
heyongqiang 已提交
25
    #define PLATFORM_IS_LITTLE_ENDIAN true
26
  #else
H
heyongqiang 已提交
27
    #define PLATFORM_IS_LITTLE_ENDIAN false
28
  #endif
29
#elif defined(OS_FREEBSD) || defined(OS_OPENBSD) || defined(OS_NETBSD) ||\
H
heyongqiang 已提交
30
      defined(OS_DRAGONFLYBSD) || defined(OS_ANDROID)
31 32
  #include <sys/types.h>
  #include <sys/endian.h>
33 34 35
#else
  #include <endian.h>
#endif
J
jorlow@chromium.org 已提交
36
#include <pthread.h>
37 38 39
#ifdef SNAPPY
#include <snappy.h>
#endif
H
heyongqiang 已提交
40

H
heyongqiang 已提交
41 42 43
#ifdef ZLIB
#include <zlib.h>
#endif
H
heyongqiang 已提交
44 45 46 47 48

#ifdef BZIP2
#include <bzlib.h>
#endif

A
Albert Strasheim 已提交
49 50 51 52 53
#if defined(LZ4)
#include <lz4.h>
#include <lz4hc.h>
#endif

J
jorlow@chromium.org 已提交
54 55
#include <stdint.h>
#include <string>
H
heyongqiang 已提交
56
#include <string.h>
57
#include "rocksdb/options.h"
58 59
#include "port/atomic_pointer.h"

H
heyongqiang 已提交
60 61
#ifndef PLATFORM_IS_LITTLE_ENDIAN
#define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN)
62 63
#endif

64
#if defined(OS_MACOSX) || defined(OS_SOLARIS) || defined(OS_FREEBSD) ||\
H
heyongqiang 已提交
65 66
    defined(OS_NETBSD) || defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD) ||\
    defined(OS_ANDROID)
67
// Use fread/fwrite/fflush on platforms without _unlocked variants
68 69 70 71 72
#define fread_unlocked fread
#define fwrite_unlocked fwrite
#define fflush_unlocked fflush
#endif

73 74 75
#if defined(OS_MACOSX) || defined(OS_FREEBSD) ||\
    defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD)
// Use fsync() on platforms without fdatasync()
76 77
#define fdatasync fsync
#endif
J
jorlow@chromium.org 已提交
78

H
heyongqiang 已提交
79 80 81 82 83 84
#if defined(OS_ANDROID) && __ANDROID_API__ < 9
// fdatasync() was only introduced in API level 9 on Android. Use fsync()
// when targetting older platforms.
#define fdatasync fsync
#endif

85
namespace rocksdb {
J
jorlow@chromium.org 已提交
86 87
namespace port {

H
heyongqiang 已提交
88 89
static const bool kLittleEndian = PLATFORM_IS_LITTLE_ENDIAN;
#undef PLATFORM_IS_LITTLE_ENDIAN
J
jorlow@chromium.org 已提交
90 91 92 93 94

class CondVar;

class Mutex {
 public:
95
  /* implicit */ Mutex(bool adaptive = false);
J
jorlow@chromium.org 已提交
96 97 98 99
  ~Mutex();

  void Lock();
  void Unlock();
100 101
  // this will assert if the mutex is not locked
  // it does NOT verify that mutex is held by a calling thread
I
Igor Canadi 已提交
102
  void AssertHeld();
J
jorlow@chromium.org 已提交
103 104 105 106

 private:
  friend class CondVar;
  pthread_mutex_t mu_;
I
Igor Canadi 已提交
107
#ifndef NDEBUG
108
  bool locked_;
I
Igor Canadi 已提交
109
#endif
J
jorlow@chromium.org 已提交
110 111 112 113 114 115

  // No copying
  Mutex(const Mutex&);
  void operator=(const Mutex&);
};

116 117 118 119 120 121 122
class RWMutex {
 public:
  RWMutex();
  ~RWMutex();

  void ReadLock();
  void WriteLock();
123 124
  void ReadUnlock();
  void WriteUnlock();
125 126 127 128 129 130 131 132 133 134
  void AssertHeld() { }

 private:
  pthread_rwlock_t mu_; // the underlying platform mutex

  // No copying allowed
  RWMutex(const RWMutex&);
  void operator=(const RWMutex&);
};

J
jorlow@chromium.org 已提交
135 136 137 138 139 140 141 142 143 144 145 146
class CondVar {
 public:
  explicit CondVar(Mutex* mu);
  ~CondVar();
  void Wait();
  void Signal();
  void SignalAll();
 private:
  pthread_cond_t cv_;
  Mutex* mu_;
};

H
heyongqiang 已提交
147 148 149 150
typedef pthread_once_t OnceType;
#define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT
extern void InitOnce(OnceType* once, void (*initializer)());

151 152
inline bool Snappy_Compress(const CompressionOptions& opts, const char* input,
                            size_t length, ::std::string* output) {
153
#ifdef SNAPPY
154
  output->resize(snappy::MaxCompressedLength(length));
155
  size_t outlen;
156
  snappy::RawCompress(input, length, &(*output)[0], &outlen);
157 158 159 160
  output->resize(outlen);
  return true;
#endif

J
jorlow@chromium.org 已提交
161
  return false;
J
jorlow@chromium.org 已提交
162 163
}

164 165
inline bool Snappy_GetUncompressedLength(const char* input, size_t length,
                                         size_t* result) {
166
#ifdef SNAPPY
167 168 169
  return snappy::GetUncompressedLength(input, length, result);
#else
  return false;
170
#endif
171
}
172

173 174 175 176 177
inline bool Snappy_Uncompress(const char* input, size_t length,
                              char* output) {
#ifdef SNAPPY
  return snappy::RawUncompress(input, length, output);
#else
J
jorlow@chromium.org 已提交
178
  return false;
179
#endif
J
jorlow@chromium.org 已提交
180 181
}

182 183
inline bool Zlib_Compress(const CompressionOptions& opts, const char* input,
                          size_t length, ::std::string* output) {
H
heyongqiang 已提交
184 185 186 187 188 189 190 191 192
#ifdef ZLIB
  // The memLevel parameter specifies how much memory should be allocated for
  // the internal compression state.
  // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
  // memLevel=9 uses maximum memory for optimal speed.
  // The default value is 8. See zconf.h for more details.
  static const int memLevel = 8;
  z_stream _stream;
  memset(&_stream, 0, sizeof(z_stream));
193 194
  int st = deflateInit2(&_stream, opts.level, Z_DEFLATED, opts.window_bits,
                        memLevel, opts.strategy);
H
heyongqiang 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
  if (st != Z_OK) {
    return false;
  }

  // Resize output to be the plain data length.
  // This may not be big enough if the compression actually expands data.
  output->resize(length);

  // Compress the input, and put compressed data in output.
  _stream.next_in = (Bytef *)input;
  _stream.avail_in = length;

  // Initialize the output size.
  _stream.avail_out = length;
  _stream.next_out = (Bytef *)&(*output)[0];

211 212 213
  int old_sz =0, new_sz =0, new_sz_delta =0;
  bool done = false;
  while (!done) {
H
heyongqiang 已提交
214 215 216
    int st = deflate(&_stream, Z_FINISH);
    switch (st) {
      case Z_STREAM_END:
217
        done = true;
H
heyongqiang 已提交
218 219 220 221 222
        break;
      case Z_OK:
        // No output space. Increase the output space by 20%.
        // (Should we fail the compression since it expands the size?)
        old_sz = output->size();
223 224
        new_sz_delta = (int)(output->size() * 0.2);
        new_sz = output->size() + (new_sz_delta < 10 ? 10 : new_sz_delta);
H
heyongqiang 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
        output->resize(new_sz);
        // Set more output.
        _stream.next_out = (Bytef *)&(*output)[old_sz];
        _stream.avail_out = new_sz - old_sz;
        break;
      case Z_BUF_ERROR:
      default:
        deflateEnd(&_stream);
        return false;
    }
  }

  output->resize(output->size() - _stream.avail_out);
  deflateEnd(&_stream);
  return true;
#endif
  return false;
}

inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
245
    int* decompress_size, int windowBits = -14) {
H
heyongqiang 已提交
246 247 248 249
#ifdef ZLIB
  z_stream _stream;
  memset(&_stream, 0, sizeof(z_stream));

250
  // For raw inflate, the windowBits should be -8..-15.
H
heyongqiang 已提交
251 252 253 254 255
  // If windowBits is bigger than zero, it will use either zlib
  // header or gzip header. Adding 32 to it will do automatic detection.
  int st = inflateInit2(&_stream,
      windowBits > 0 ? windowBits + 32 : windowBits);
  if (st != Z_OK) {
256
    return nullptr;
H
heyongqiang 已提交
257 258 259 260 261 262 263 264 265 266 267 268 269
  }

  _stream.next_in = (Bytef *)input_data;
  _stream.avail_in = input_length;

  // Assume the decompressed data size will 5x of compressed size.
  int output_len = input_length * 5;
  char* output = new char[output_len];
  int old_sz = output_len;

  _stream.next_out = (Bytef *)output;
  _stream.avail_out = output_len;

270 271 272
  char* tmp = nullptr;
  int output_len_delta;
  bool done = false;
H
heyongqiang 已提交
273

274 275
  //while(_stream.next_in != nullptr && _stream.avail_in != 0) {
  while (!done) {
H
heyongqiang 已提交
276 277 278
    int st = inflate(&_stream, Z_SYNC_FLUSH);
    switch (st) {
      case Z_STREAM_END:
279
        done = true;
H
heyongqiang 已提交
280 281 282 283
        break;
      case Z_OK:
        // No output space. Increase the output space by 20%.
        old_sz = output_len;
284 285
        output_len_delta = (int)(output_len * 0.2);
        output_len += output_len_delta < 10 ? 10 : output_len_delta;
H
heyongqiang 已提交
286 287 288 289 290 291 292 293 294 295 296 297 298
        tmp = new char[output_len];
        memcpy(tmp, output, old_sz);
        delete[] output;
        output = tmp;

        // Set more output.
        _stream.next_out = (Bytef *)(output + old_sz);
        _stream.avail_out = output_len - old_sz;
        break;
      case Z_BUF_ERROR:
      default:
        delete[] output;
        inflateEnd(&_stream);
299
        return nullptr;
H
heyongqiang 已提交
300 301 302 303 304 305 306 307
    }
  }

  *decompress_size = output_len - _stream.avail_out;
  inflateEnd(&_stream);
  return output;
#endif

308
  return nullptr;
H
heyongqiang 已提交
309 310
}

311 312
inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
                           size_t length, ::std::string* output) {
H
heyongqiang 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
#ifdef BZIP2
  bz_stream _stream;
  memset(&_stream, 0, sizeof(bz_stream));

  // Block size 1 is 100K.
  // 0 is for silent.
  // 30 is the default workFactor
  int st = BZ2_bzCompressInit(&_stream, 1, 0, 30);
  if (st != BZ_OK) {
    return false;
  }

  // Resize output to be the plain data length.
  // This may not be big enough if the compression actually expands data.
  output->resize(length);

  // Compress the input, and put compressed data in output.
  _stream.next_in = (char *)input;
  _stream.avail_in = length;

  // Initialize the output size.
  _stream.next_out = (char *)&(*output)[0];
  _stream.avail_out = length;

  int old_sz =0, new_sz =0;
338
  while(_stream.next_in != nullptr && _stream.avail_in != 0) {
H
heyongqiang 已提交
339 340 341 342 343 344 345 346
    int st = BZ2_bzCompress(&_stream, BZ_FINISH);
    switch (st) {
      case BZ_STREAM_END:
        break;
      case BZ_FINISH_OK:
        // No output space. Increase the output space by 20%.
        // (Should we fail the compression since it expands the size?)
        old_sz = output->size();
H
heyongqiang 已提交
347
        new_sz = (int)(output->size() * 1.2);
H
heyongqiang 已提交
348 349 350 351 352
        output->resize(new_sz);
        // Set more output.
        _stream.next_out = (char *)&(*output)[old_sz];
        _stream.avail_out = new_sz - old_sz;
        break;
I
Igor Canadi 已提交
353
      case BZ_SEQUENCE_ERROR:
H
heyongqiang 已提交
354 355 356 357 358 359 360 361 362 363
      default:
        BZ2_bzCompressEnd(&_stream);
        return false;
    }
  }

  output->resize(output->size() - _stream.avail_out);
  BZ2_bzCompressEnd(&_stream);
  return true;
#endif
H
heyongqiang 已提交
364
  return false;
H
heyongqiang 已提交
365 366
}

A
Albert Strasheim 已提交
367 368
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
                              int* decompress_size) {
H
heyongqiang 已提交
369 370 371 372 373 374
#ifdef BZIP2
  bz_stream _stream;
  memset(&_stream, 0, sizeof(bz_stream));

  int st = BZ2_bzDecompressInit(&_stream, 0, 0);
  if (st != BZ_OK) {
375
    return nullptr;
H
heyongqiang 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388
  }

  _stream.next_in = (char *)input_data;
  _stream.avail_in = input_length;

  // Assume the decompressed data size will be 5x of compressed size.
  int output_len = input_length * 5;
  char* output = new char[output_len];
  int old_sz = output_len;

  _stream.next_out = (char *)output;
  _stream.avail_out = output_len;

389
  char* tmp = nullptr;
H
heyongqiang 已提交
390

391
  while(_stream.next_in != nullptr && _stream.avail_in != 0) {
H
heyongqiang 已提交
392 393 394 395
    int st = BZ2_bzDecompress(&_stream);
    switch (st) {
      case BZ_STREAM_END:
        break;
I
Igor Canadi 已提交
396
      case BZ_OK:
H
heyongqiang 已提交
397 398
        // No output space. Increase the output space by 20%.
        old_sz = output_len;
H
heyongqiang 已提交
399
        output_len = (int)(output_len * 1.2);
H
heyongqiang 已提交
400 401 402 403 404 405 406 407 408 409 410 411
        tmp = new char[output_len];
        memcpy(tmp, output, old_sz);
        delete[] output;
        output = tmp;

        // Set more output.
        _stream.next_out = (char *)(output + old_sz);
        _stream.avail_out = output_len - old_sz;
        break;
      default:
        delete[] output;
        BZ2_bzDecompressEnd(&_stream);
412
        return nullptr;
H
heyongqiang 已提交
413 414 415 416 417 418 419
    }
  }

  *decompress_size = output_len - _stream.avail_out;
  BZ2_bzDecompressEnd(&_stream);
  return output;
#endif
420
  return nullptr;
H
heyongqiang 已提交
421 422
}

A
Albert Strasheim 已提交
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
inline bool LZ4_Compress(const CompressionOptions &opts, const char *input,
                         size_t length, ::std::string* output) {
#ifdef LZ4
  int compressBound = LZ4_compressBound(length);
  output->resize(8 + compressBound);
  char *p = const_cast<char *>(output->c_str());
  memcpy(p, &length, sizeof(length));
  size_t outlen;
  outlen = LZ4_compress_limitedOutput(input, p + 8, length, compressBound);
  if (outlen == 0) {
    return false;
  }
  output->resize(8 + outlen);
  return true;
#endif
  return false;
}

inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
                            int* decompress_size) {
#ifdef LZ4
  if (input_length < 8) {
    return nullptr;
  }
  int output_len;
  memcpy(&output_len, input_data, sizeof(output_len));
  char *output = new char[output_len];
  *decompress_size = LZ4_decompress_safe_partial(
      input_data + 8, output, input_length - 8, output_len, output_len);
  if (*decompress_size < 0) {
    delete[] output;
    return nullptr;
  }
  return output;
#endif
  return nullptr;
}

inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input,
                           size_t length, ::std::string* output) {
#ifdef LZ4
  int compressBound = LZ4_compressBound(length);
  output->resize(8 + compressBound);
  char *p = const_cast<char *>(output->c_str());
  memcpy(p, &length, sizeof(length));
  size_t outlen;
469
#ifdef LZ4_VERSION_MAJOR  // they only started defining this since r113
A
Albert Strasheim 已提交
470 471
  outlen = LZ4_compressHC2_limitedOutput(input, p + 8, length, compressBound,
                                         opts.level);
472 473 474
#else
  outlen = LZ4_compressHC_limitedOutput(input, p + 8, length, compressBound);
#endif
A
Albert Strasheim 已提交
475 476 477 478 479 480 481 482 483
  if (outlen == 0) {
    return false;
  }
  output->resize(8 + outlen);
  return true;
#endif
  return false;
}

L
Lei Jin 已提交
484 485
#define CACHE_LINE_SIZE 64U

486 487
#define PREFETCH(addr, rw, locality) __builtin_prefetch(addr, rw, locality)

488
} // namespace port
489
} // namespace rocksdb
490 491

#endif  // STORAGE_LEVELDB_PORT_PORT_POSIX_H_