port_posix.h 13.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// See port_example.h for documentation for the following types/functions.

12 13
#ifndef STORAGE_LEVELDB_PORT_PORT_POSIX_H_
#define STORAGE_LEVELDB_PORT_PORT_POSIX_H_
J
jorlow@chromium.org 已提交
14

H
heyongqiang 已提交
15
#undef PLATFORM_IS_LITTLE_ENDIAN
16
#if defined(OS_MACOSX)
17
  #include <machine/endian.h>
H
heyongqiang 已提交
18 19 20 21
  #if defined(__DARWIN_LITTLE_ENDIAN) && defined(__DARWIN_BYTE_ORDER)
    #define PLATFORM_IS_LITTLE_ENDIAN \
        (__DARWIN_BYTE_ORDER == __DARWIN_LITTLE_ENDIAN)
  #endif
22 23 24
#elif defined(OS_SOLARIS)
  #include <sys/isa_defs.h>
  #ifdef _LITTLE_ENDIAN
H
heyongqiang 已提交
25
    #define PLATFORM_IS_LITTLE_ENDIAN true
26
  #else
H
heyongqiang 已提交
27
    #define PLATFORM_IS_LITTLE_ENDIAN false
28
  #endif
29
#elif defined(OS_FREEBSD) || defined(OS_OPENBSD) || defined(OS_NETBSD) ||\
H
heyongqiang 已提交
30
      defined(OS_DRAGONFLYBSD) || defined(OS_ANDROID)
31 32
  #include <sys/types.h>
  #include <sys/endian.h>
33 34 35
#else
  #include <endian.h>
#endif
J
jorlow@chromium.org 已提交
36
#include <pthread.h>
37 38 39
#ifdef SNAPPY
#include <snappy.h>
#endif
H
heyongqiang 已提交
40

H
heyongqiang 已提交
41 42 43
#ifdef ZLIB
#include <zlib.h>
#endif
H
heyongqiang 已提交
44 45 46 47 48

#ifdef BZIP2
#include <bzlib.h>
#endif

A
Albert Strasheim 已提交
49 50 51 52 53
#if defined(LZ4)
#include <lz4.h>
#include <lz4hc.h>
#endif

J
jorlow@chromium.org 已提交
54 55
#include <stdint.h>
#include <string>
H
heyongqiang 已提交
56
#include <string.h>
57
#include "rocksdb/options.h"
58 59
#include "port/atomic_pointer.h"

H
heyongqiang 已提交
60 61
#ifndef PLATFORM_IS_LITTLE_ENDIAN
#define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN)
62 63
#endif

64
#if defined(OS_MACOSX) || defined(OS_SOLARIS) || defined(OS_FREEBSD) ||\
H
heyongqiang 已提交
65 66
    defined(OS_NETBSD) || defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD) ||\
    defined(OS_ANDROID)
67
// Use fread/fwrite/fflush on platforms without _unlocked variants
68 69 70 71 72
#define fread_unlocked fread
#define fwrite_unlocked fwrite
#define fflush_unlocked fflush
#endif

73 74 75
#if defined(OS_MACOSX) || defined(OS_FREEBSD) ||\
    defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD)
// Use fsync() on platforms without fdatasync()
76 77
#define fdatasync fsync
#endif
J
jorlow@chromium.org 已提交
78

H
heyongqiang 已提交
79 80 81 82 83 84
#if defined(OS_ANDROID) && __ANDROID_API__ < 9
// fdatasync() was only introduced in API level 9 on Android. Use fsync()
// when targetting older platforms.
#define fdatasync fsync
#endif

85
namespace rocksdb {
J
jorlow@chromium.org 已提交
86 87
namespace port {

H
heyongqiang 已提交
88 89
static const bool kLittleEndian = PLATFORM_IS_LITTLE_ENDIAN;
#undef PLATFORM_IS_LITTLE_ENDIAN
J
jorlow@chromium.org 已提交
90 91 92 93 94

class CondVar;

class Mutex {
 public:
95
  /* implicit */ Mutex(bool adaptive = false);
J
jorlow@chromium.org 已提交
96 97 98 99
  ~Mutex();

  void Lock();
  void Unlock();
100 101
  // this will assert if the mutex is not locked
  // it does NOT verify that mutex is held by a calling thread
I
Igor Canadi 已提交
102
  void AssertHeld();
J
jorlow@chromium.org 已提交
103 104 105 106

 private:
  friend class CondVar;
  pthread_mutex_t mu_;
I
Igor Canadi 已提交
107
#ifndef NDEBUG
108
  bool locked_;
I
Igor Canadi 已提交
109
#endif
J
jorlow@chromium.org 已提交
110 111 112 113 114 115

  // No copying
  Mutex(const Mutex&);
  void operator=(const Mutex&);
};

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
class RWMutex {
 public:
  RWMutex();
  ~RWMutex();

  void ReadLock();
  void WriteLock();
  void Unlock();
  void AssertHeld() { }

 private:
  pthread_rwlock_t mu_; // the underlying platform mutex

  // No copying allowed
  RWMutex(const RWMutex&);
  void operator=(const RWMutex&);
};

J
jorlow@chromium.org 已提交
134 135 136 137 138 139 140 141 142 143 144 145
class CondVar {
 public:
  explicit CondVar(Mutex* mu);
  ~CondVar();
  void Wait();
  void Signal();
  void SignalAll();
 private:
  pthread_cond_t cv_;
  Mutex* mu_;
};

H
heyongqiang 已提交
146 147 148 149
typedef pthread_once_t OnceType;
#define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT
extern void InitOnce(OnceType* once, void (*initializer)());

150 151
inline bool Snappy_Compress(const CompressionOptions& opts, const char* input,
                            size_t length, ::std::string* output) {
152
#ifdef SNAPPY
153
  output->resize(snappy::MaxCompressedLength(length));
154
  size_t outlen;
155
  snappy::RawCompress(input, length, &(*output)[0], &outlen);
156 157 158 159
  output->resize(outlen);
  return true;
#endif

J
jorlow@chromium.org 已提交
160
  return false;
J
jorlow@chromium.org 已提交
161 162
}

163 164
inline bool Snappy_GetUncompressedLength(const char* input, size_t length,
                                         size_t* result) {
165
#ifdef SNAPPY
166 167 168
  return snappy::GetUncompressedLength(input, length, result);
#else
  return false;
169
#endif
170
}
171

172 173 174 175 176
inline bool Snappy_Uncompress(const char* input, size_t length,
                              char* output) {
#ifdef SNAPPY
  return snappy::RawUncompress(input, length, output);
#else
J
jorlow@chromium.org 已提交
177
  return false;
178
#endif
J
jorlow@chromium.org 已提交
179 180
}

181 182
inline bool Zlib_Compress(const CompressionOptions& opts, const char* input,
                          size_t length, ::std::string* output) {
H
heyongqiang 已提交
183 184 185 186 187 188 189 190 191
#ifdef ZLIB
  // The memLevel parameter specifies how much memory should be allocated for
  // the internal compression state.
  // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
  // memLevel=9 uses maximum memory for optimal speed.
  // The default value is 8. See zconf.h for more details.
  static const int memLevel = 8;
  z_stream _stream;
  memset(&_stream, 0, sizeof(z_stream));
192 193
  int st = deflateInit2(&_stream, opts.level, Z_DEFLATED, opts.window_bits,
                        memLevel, opts.strategy);
H
heyongqiang 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
  if (st != Z_OK) {
    return false;
  }

  // Resize output to be the plain data length.
  // This may not be big enough if the compression actually expands data.
  output->resize(length);

  // Compress the input, and put compressed data in output.
  _stream.next_in = (Bytef *)input;
  _stream.avail_in = length;

  // Initialize the output size.
  _stream.avail_out = length;
  _stream.next_out = (Bytef *)&(*output)[0];

210 211 212
  int old_sz =0, new_sz =0, new_sz_delta =0;
  bool done = false;
  while (!done) {
H
heyongqiang 已提交
213 214 215
    int st = deflate(&_stream, Z_FINISH);
    switch (st) {
      case Z_STREAM_END:
216
        done = true;
H
heyongqiang 已提交
217 218 219 220 221
        break;
      case Z_OK:
        // No output space. Increase the output space by 20%.
        // (Should we fail the compression since it expands the size?)
        old_sz = output->size();
222 223
        new_sz_delta = (int)(output->size() * 0.2);
        new_sz = output->size() + (new_sz_delta < 10 ? 10 : new_sz_delta);
H
heyongqiang 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
        output->resize(new_sz);
        // Set more output.
        _stream.next_out = (Bytef *)&(*output)[old_sz];
        _stream.avail_out = new_sz - old_sz;
        break;
      case Z_BUF_ERROR:
      default:
        deflateEnd(&_stream);
        return false;
    }
  }

  output->resize(output->size() - _stream.avail_out);
  deflateEnd(&_stream);
  return true;
#endif
  return false;
}

inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
244
    int* decompress_size, int windowBits = -14) {
H
heyongqiang 已提交
245 246 247 248
#ifdef ZLIB
  z_stream _stream;
  memset(&_stream, 0, sizeof(z_stream));

249
  // For raw inflate, the windowBits should be -8..-15.
H
heyongqiang 已提交
250 251 252 253 254
  // If windowBits is bigger than zero, it will use either zlib
  // header or gzip header. Adding 32 to it will do automatic detection.
  int st = inflateInit2(&_stream,
      windowBits > 0 ? windowBits + 32 : windowBits);
  if (st != Z_OK) {
255
    return nullptr;
H
heyongqiang 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268
  }

  _stream.next_in = (Bytef *)input_data;
  _stream.avail_in = input_length;

  // Assume the decompressed data size will 5x of compressed size.
  int output_len = input_length * 5;
  char* output = new char[output_len];
  int old_sz = output_len;

  _stream.next_out = (Bytef *)output;
  _stream.avail_out = output_len;

269 270 271
  char* tmp = nullptr;
  int output_len_delta;
  bool done = false;
H
heyongqiang 已提交
272

273 274
  //while(_stream.next_in != nullptr && _stream.avail_in != 0) {
  while (!done) {
H
heyongqiang 已提交
275 276 277
    int st = inflate(&_stream, Z_SYNC_FLUSH);
    switch (st) {
      case Z_STREAM_END:
278
        done = true;
H
heyongqiang 已提交
279 280 281 282
        break;
      case Z_OK:
        // No output space. Increase the output space by 20%.
        old_sz = output_len;
283 284
        output_len_delta = (int)(output_len * 0.2);
        output_len += output_len_delta < 10 ? 10 : output_len_delta;
H
heyongqiang 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297
        tmp = new char[output_len];
        memcpy(tmp, output, old_sz);
        delete[] output;
        output = tmp;

        // Set more output.
        _stream.next_out = (Bytef *)(output + old_sz);
        _stream.avail_out = output_len - old_sz;
        break;
      case Z_BUF_ERROR:
      default:
        delete[] output;
        inflateEnd(&_stream);
298
        return nullptr;
H
heyongqiang 已提交
299 300 301 302 303 304 305 306
    }
  }

  *decompress_size = output_len - _stream.avail_out;
  inflateEnd(&_stream);
  return output;
#endif

307
  return nullptr;
H
heyongqiang 已提交
308 309
}

310 311
inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
                           size_t length, ::std::string* output) {
H
heyongqiang 已提交
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
#ifdef BZIP2
  bz_stream _stream;
  memset(&_stream, 0, sizeof(bz_stream));

  // Block size 1 is 100K.
  // 0 is for silent.
  // 30 is the default workFactor
  int st = BZ2_bzCompressInit(&_stream, 1, 0, 30);
  if (st != BZ_OK) {
    return false;
  }

  // Resize output to be the plain data length.
  // This may not be big enough if the compression actually expands data.
  output->resize(length);

  // Compress the input, and put compressed data in output.
  _stream.next_in = (char *)input;
  _stream.avail_in = length;

  // Initialize the output size.
  _stream.next_out = (char *)&(*output)[0];
  _stream.avail_out = length;

  int old_sz =0, new_sz =0;
337
  while(_stream.next_in != nullptr && _stream.avail_in != 0) {
H
heyongqiang 已提交
338 339 340 341 342 343 344 345
    int st = BZ2_bzCompress(&_stream, BZ_FINISH);
    switch (st) {
      case BZ_STREAM_END:
        break;
      case BZ_FINISH_OK:
        // No output space. Increase the output space by 20%.
        // (Should we fail the compression since it expands the size?)
        old_sz = output->size();
H
heyongqiang 已提交
346
        new_sz = (int)(output->size() * 1.2);
H
heyongqiang 已提交
347 348 349 350 351
        output->resize(new_sz);
        // Set more output.
        _stream.next_out = (char *)&(*output)[old_sz];
        _stream.avail_out = new_sz - old_sz;
        break;
I
Igor Canadi 已提交
352
      case BZ_SEQUENCE_ERROR:
H
heyongqiang 已提交
353 354 355 356 357 358 359 360 361 362
      default:
        BZ2_bzCompressEnd(&_stream);
        return false;
    }
  }

  output->resize(output->size() - _stream.avail_out);
  BZ2_bzCompressEnd(&_stream);
  return true;
#endif
H
heyongqiang 已提交
363
  return false;
H
heyongqiang 已提交
364 365
}

A
Albert Strasheim 已提交
366 367
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
                              int* decompress_size) {
H
heyongqiang 已提交
368 369 370 371 372 373
#ifdef BZIP2
  bz_stream _stream;
  memset(&_stream, 0, sizeof(bz_stream));

  int st = BZ2_bzDecompressInit(&_stream, 0, 0);
  if (st != BZ_OK) {
374
    return nullptr;
H
heyongqiang 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387
  }

  _stream.next_in = (char *)input_data;
  _stream.avail_in = input_length;

  // Assume the decompressed data size will be 5x of compressed size.
  int output_len = input_length * 5;
  char* output = new char[output_len];
  int old_sz = output_len;

  _stream.next_out = (char *)output;
  _stream.avail_out = output_len;

388
  char* tmp = nullptr;
H
heyongqiang 已提交
389

390
  while(_stream.next_in != nullptr && _stream.avail_in != 0) {
H
heyongqiang 已提交
391 392 393 394
    int st = BZ2_bzDecompress(&_stream);
    switch (st) {
      case BZ_STREAM_END:
        break;
I
Igor Canadi 已提交
395
      case BZ_OK:
H
heyongqiang 已提交
396 397
        // No output space. Increase the output space by 20%.
        old_sz = output_len;
H
heyongqiang 已提交
398
        output_len = (int)(output_len * 1.2);
H
heyongqiang 已提交
399 400 401 402 403 404 405 406 407 408 409 410
        tmp = new char[output_len];
        memcpy(tmp, output, old_sz);
        delete[] output;
        output = tmp;

        // Set more output.
        _stream.next_out = (char *)(output + old_sz);
        _stream.avail_out = output_len - old_sz;
        break;
      default:
        delete[] output;
        BZ2_bzDecompressEnd(&_stream);
411
        return nullptr;
H
heyongqiang 已提交
412 413 414 415 416 417 418
    }
  }

  *decompress_size = output_len - _stream.avail_out;
  BZ2_bzDecompressEnd(&_stream);
  return output;
#endif
419
  return nullptr;
H
heyongqiang 已提交
420 421
}

A
Albert Strasheim 已提交
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
inline bool LZ4_Compress(const CompressionOptions &opts, const char *input,
                         size_t length, ::std::string* output) {
#ifdef LZ4
  int compressBound = LZ4_compressBound(length);
  output->resize(8 + compressBound);
  char *p = const_cast<char *>(output->c_str());
  memcpy(p, &length, sizeof(length));
  size_t outlen;
  outlen = LZ4_compress_limitedOutput(input, p + 8, length, compressBound);
  if (outlen == 0) {
    return false;
  }
  output->resize(8 + outlen);
  return true;
#endif
  return false;
}

inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
                            int* decompress_size) {
#ifdef LZ4
  if (input_length < 8) {
    return nullptr;
  }
  int output_len;
  memcpy(&output_len, input_data, sizeof(output_len));
  char *output = new char[output_len];
  *decompress_size = LZ4_decompress_safe_partial(
      input_data + 8, output, input_length - 8, output_len, output_len);
  if (*decompress_size < 0) {
    delete[] output;
    return nullptr;
  }
  return output;
#endif
  return nullptr;
}

inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input,
                           size_t length, ::std::string* output) {
#ifdef LZ4
  int compressBound = LZ4_compressBound(length);
  output->resize(8 + compressBound);
  char *p = const_cast<char *>(output->c_str());
  memcpy(p, &length, sizeof(length));
  size_t outlen;
468
#ifdef LZ4_VERSION_MAJOR  // they only started defining this since r113
A
Albert Strasheim 已提交
469 470
  outlen = LZ4_compressHC2_limitedOutput(input, p + 8, length, compressBound,
                                         opts.level);
471 472 473
#else
  outlen = LZ4_compressHC_limitedOutput(input, p + 8, length, compressBound);
#endif
A
Albert Strasheim 已提交
474 475 476 477 478 479 480 481 482
  if (outlen == 0) {
    return false;
  }
  output->resize(8 + outlen);
  return true;
#endif
  return false;
}

L
Lei Jin 已提交
483 484
#define CACHE_LINE_SIZE 64U

485 486
#define PREFETCH(addr, rw, locality) __builtin_prefetch(addr, rw, locality)

487
} // namespace port
488
} // namespace rocksdb
489 490

#endif  // STORAGE_LEVELDB_PORT_PORT_POSIX_H_