osSemaphore.c 11.6 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
S
Shengliang Guan 已提交
18

wafwerar's avatar
wafwerar 已提交
19
#ifdef WINDOWS
S
Shengliang Guan 已提交
20 21 22 23 24 25 26

/*
 * windows implementation
 */

#include <windows.h>

wafwerar's avatar
wafwerar 已提交
27
bool taosCheckPthreadValid(TdThread thread) { return thread.p != NULL; }
S
Shengliang Guan 已提交
28

wafwerar's avatar
wafwerar 已提交
29
void taosResetPthread(TdThread* thread) { thread->p = 0; }
S
Shengliang Guan 已提交
30

wafwerar's avatar
wafwerar 已提交
31
int64_t taosGetPthreadId(TdThread thread) {
S
Shengliang Guan 已提交
32 33 34 35 36 37 38 39 40
#ifdef PTW32_VERSION
  return pthread_getw32threadid_np(thread);
#else
  return (int64_t)thread;
#endif
}

int64_t taosGetSelfPthreadId() { return GetCurrentThreadId(); }

wafwerar's avatar
wafwerar 已提交
41
bool taosComparePthread(TdThread first, TdThread second) { return first.p == second.p; }
S
Shengliang Guan 已提交
42 43 44

int32_t taosGetPId() { return GetCurrentProcessId(); }

45
int32_t taosGetAppName(char* name, int32_t* len) {
S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
  char filepath[1024] = {0};

  GetModuleFileName(NULL, filepath, MAX_PATH);
  char* sub = strrchr(filepath, '.');
  if (sub != NULL) {
    *sub = '\0';
  }
  strcpy(name, filepath);

  if (len != NULL) {
    *len = (int32_t)strlen(filepath);
  }

  return 0;
}

int32_t tsem_wait(tsem_t* sem) {
  int ret = 0;
  do {
    ret = sem_wait(sem);
  } while (ret != 0 && errno == EINTR);
  return ret;
}

#elif defined(_TD_DARWIN_64)

/*
 * darwin implementation
 */

F
freemine 已提交
76 77
#include <libproc.h>

78 79 80 81 82 83
// #define SEM_USE_PTHREAD
// #define SEM_USE_POSIX
#define SEM_USE_SEM

#ifdef SEM_USE_SEM
#include <mach/mach_error.h>
S
Shengliang Guan 已提交
84
#include <mach/mach_init.h>
85 86 87
#include <mach/semaphore.h>
#include <mach/task.h>

wafwerar's avatar
wafwerar 已提交
88 89
static TdThread      sem_thread;
static TdThreadOnce sem_once;
S
Shengliang Guan 已提交
90 91 92
static task_t         sem_port;
static volatile int   sem_inited = 0;
static semaphore_t    sem_exit;
93

S
Shengliang Guan 已提交
94
static void *sem_thread_routine(void *arg) {
95
  (void)arg;
96 97
  setThreadName("sem_thrd");

98 99 100
  sem_port = mach_task_self();
  kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
  if (ret != KERN_SUCCESS) {
wafwerar's avatar
wafwerar 已提交
101
    fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
102 103 104 105 106 107 108 109 110
    sem_inited = -1;
    return NULL;
  }
  sem_inited = 1;
  semaphore_wait(sem_exit);
  return NULL;
}

static void once_init(void) {
S
TD-4088  
Shengliang Guan 已提交
111
  int r = 0;
wafwerar's avatar
wafwerar 已提交
112
  r = taosThreadCreate(&sem_thread, NULL, sem_thread_routine, NULL);
113
  if (r) {
wafwerar's avatar
wafwerar 已提交
114
    fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
115 116
    return;
  }
S
Shengliang Guan 已提交
117
  while (sem_inited == 0) {
118 119 120 121 122
    ;
  }
}
#endif

F
freemine 已提交
123
struct tsem_s {
124
#ifdef SEM_USE_PTHREAD
wafwerar's avatar
wafwerar 已提交
125 126
  TdThreadMutex  lock;
  TdThreadCond   cond;
S
Shengliang Guan 已提交
127
  volatile int64_t val;
128
#elif defined(SEM_USE_POSIX)
S
Shengliang Guan 已提交
129 130
  size_t        id;
  sem_t *       sem;
131
#elif defined(SEM_USE_SEM)
S
Shengliang Guan 已提交
132 133 134 135
  semaphore_t sem;
#else   // SEM_USE_PTHREAD
  dispatch_semaphore_t sem;
#endif  // SEM_USE_PTHREAD
136

S
Shengliang Guan 已提交
137
  volatile unsigned int valid : 1;
F
freemine 已提交
138 139
};

S
TD-4088  
Shengliang Guan 已提交
140
int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
wafwerar's avatar
wafwerar 已提交
141
  // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
142
  if (*sem) {
wafwerar's avatar
wafwerar 已提交
143
    fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
144
    abort();
S
slguan 已提交
145
  }
wafwerar's avatar
wafwerar 已提交
146
  struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p));
F
freemine 已提交
147
  if (!p) {
wafwerar's avatar
wafwerar 已提交
148
    fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
149 150 151
    abort();
  }

152
#ifdef SEM_USE_PTHREAD
wafwerar's avatar
wafwerar 已提交
153
  int r = taosThreadMutexInit(&p->lock, NULL);
154 155
  do {
    if (r) break;
wafwerar's avatar
wafwerar 已提交
156
    r = taosThreadCondInit(&p->cond, NULL);
157
    if (r) {
wafwerar's avatar
wafwerar 已提交
158
      taosThreadMutexDestroy(&p->lock);
159 160 161 162 163
      break;
    }
    p->val = value;
  } while (0);
  if (r) {
wafwerar's avatar
wafwerar 已提交
164
    fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
165 166 167 168 169 170
    abort();
  }
#elif defined(SEM_USE_POSIX)
  static size_t tick = 0;
  do {
    size_t id = atomic_add_fetch_64(&tick, 1);
S
Shengliang Guan 已提交
171
    if (id == SEM_VALUE_MAX) {
172 173 174
      atomic_store_64(&tick, 0);
      id = 0;
    }
S
Shengliang Guan 已提交
175
    char name[NAME_MAX - 4];
176
    snprintf(name, sizeof(name), "/t%ld", id);
S
Shengliang Guan 已提交
177 178 179
    p->sem = sem_open(name, O_CREAT | O_EXCL, pshared, value);
    p->id = id;
    if (p->sem != SEM_FAILED) break;
S
TD-4088  
Shengliang Guan 已提交
180
    int e = errno;
S
Shengliang Guan 已提交
181 182
    if (e == EEXIST) continue;
    if (e == EINTR) continue;
wafwerar's avatar
wafwerar 已提交
183
    fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, e,
S
Shengliang Guan 已提交
184
            strerror(e));
185
    abort();
S
Shengliang Guan 已提交
186
  } while (p->sem == SEM_FAILED);
187
#elif defined(SEM_USE_SEM)
wafwerar's avatar
wafwerar 已提交
188
  taosThreadOnce(&sem_once, once_init);
S
Shengliang Guan 已提交
189
  if (sem_inited != 1) {
wafwerar's avatar
wafwerar 已提交
190
    fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
191 192 193
    errno = ENOMEM;
    return -1;
  }
F
freemine 已提交
194
  kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
195
  if (ret != KERN_SUCCESS) {
wafwerar's avatar
wafwerar 已提交
196
    fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
197 198 199
    // we fail-fast here, because we have less-doc about semaphore_create for the moment
    abort();
  }
S
Shengliang Guan 已提交
200
#else   // SEM_USE_PTHREAD
F
freemine 已提交
201 202
  p->sem = dispatch_semaphore_create(value);
  if (p->sem == NULL) {
wafwerar's avatar
wafwerar 已提交
203
    fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
204 205
    abort();
  }
S
Shengliang Guan 已提交
206
#endif  // SEM_USE_PTHREAD
207

F
freemine 已提交
208 209 210
  p->valid = 1;

  *sem = p;
S
slguan 已提交
211 212 213 214

  return 0;
}

S
TD-4088  
Shengliang Guan 已提交
215
int tsem_wait(tsem_t *sem) {
F
freemine 已提交
216
  if (!*sem) {
wafwerar's avatar
wafwerar 已提交
217
    fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
218 219 220 221
    abort();
  }
  struct tsem_s *p = *sem;
  if (!p->valid) {
wafwerar's avatar
wafwerar 已提交
222
    fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
223 224
    abort();
  }
225
#ifdef SEM_USE_PTHREAD
wafwerar's avatar
wafwerar 已提交
226
  if (taosThreadMutexLock(&p->lock)) {
wafwerar's avatar
wafwerar 已提交
227
    fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
228 229 230 231
    abort();
  }
  p->val -= 1;
  if (p->val < 0) {
wafwerar's avatar
wafwerar 已提交
232
    if (taosThreadCondWait(&p->cond, &p->lock)) {
wafwerar's avatar
wafwerar 已提交
233
      fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
234 235 236
      abort();
    }
  }
wafwerar's avatar
wafwerar 已提交
237
  if (taosThreadMutexUnlock(&p->lock)) {
wafwerar's avatar
wafwerar 已提交
238
    fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
239 240 241 242 243 244 245
    abort();
  }
  return 0;
#elif defined(SEM_USE_POSIX)
  return sem_wait(p->sem);
#elif defined(SEM_USE_SEM)
  return semaphore_wait(p->sem);
S
Shengliang Guan 已提交
246
#else   // SEM_USE_PTHREAD
F
freemine 已提交
247
  return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER);
S
Shengliang Guan 已提交
248
#endif  // SEM_USE_PTHREAD
S
slguan 已提交
249 250
}

S
TD-4088  
Shengliang Guan 已提交
251
int tsem_post(tsem_t *sem) {
F
freemine 已提交
252
  if (!*sem) {
wafwerar's avatar
wafwerar 已提交
253
    fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
254 255 256 257
    abort();
  }
  struct tsem_s *p = *sem;
  if (!p->valid) {
wafwerar's avatar
wafwerar 已提交
258
    fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
259 260
    abort();
  }
261
#ifdef SEM_USE_PTHREAD
wafwerar's avatar
wafwerar 已提交
262
  if (taosThreadMutexLock(&p->lock)) {
wafwerar's avatar
wafwerar 已提交
263
    fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
264 265 266 267
    abort();
  }
  p->val += 1;
  if (p->val <= 0) {
wafwerar's avatar
wafwerar 已提交
268
    if (taosThreadCondSignal(&p->cond)) {
wafwerar's avatar
wafwerar 已提交
269
      fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
270 271 272
      abort();
    }
  }
wafwerar's avatar
wafwerar 已提交
273
  if (taosThreadMutexUnlock(&p->lock)) {
wafwerar's avatar
wafwerar 已提交
274
    fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
275 276 277 278 279 280 281
    abort();
  }
  return 0;
#elif defined(SEM_USE_POSIX)
  return sem_post(p->sem);
#elif defined(SEM_USE_SEM)
  return semaphore_signal(p->sem);
S
Shengliang Guan 已提交
282
#else   // SEM_USE_PTHREAD
F
freemine 已提交
283
  return dispatch_semaphore_signal(p->sem);
S
Shengliang Guan 已提交
284
#endif  // SEM_USE_PTHREAD
F
freemine 已提交
285 286
}

S
TD-4088  
Shengliang Guan 已提交
287
int tsem_destroy(tsem_t *sem) {
wafwerar's avatar
wafwerar 已提交
288
  // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
F
freemine 已提交
289
  if (!*sem) {
wafwerar's avatar
wafwerar 已提交
290
    // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
291
    // abort();
F
freemine 已提交
292 293 294 295
    return 0;
  }
  struct tsem_s *p = *sem;
  if (!p->valid) {
wafwerar's avatar
wafwerar 已提交
296
    // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
297 298 299 300
    // abort();
    return 0;
  }
#ifdef SEM_USE_PTHREAD
wafwerar's avatar
wafwerar 已提交
301
  if (taosThreadMutexLock(&p->lock)) {
wafwerar's avatar
wafwerar 已提交
302
    fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
303 304 305
    abort();
  }
  p->valid = 0;
wafwerar's avatar
wafwerar 已提交
306
  if (taosThreadCondDestroy(&p->cond)) {
wafwerar's avatar
wafwerar 已提交
307
    fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
308 309
    abort();
  }
wafwerar's avatar
wafwerar 已提交
310
  if (taosThreadMutexUnlock(&p->lock)) {
wafwerar's avatar
wafwerar 已提交
311
    fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
312 313
    abort();
  }
wafwerar's avatar
wafwerar 已提交
314
  if (taosThreadMutexDestroy(&p->lock)) {
wafwerar's avatar
wafwerar 已提交
315
    fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
316 317 318
    abort();
  }
#elif defined(SEM_USE_POSIX)
S
Shengliang Guan 已提交
319
  char name[NAME_MAX - 4];
320
  snprintf(name, sizeof(name), "/t%ld", p->id);
S
TD-4088  
Shengliang Guan 已提交
321
  int r = sem_unlink(name);
322
  if (r) {
S
TD-4088  
Shengliang Guan 已提交
323
    int e = errno;
wafwerar's avatar
wafwerar 已提交
324
    fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, e,
S
Shengliang Guan 已提交
325
            strerror(e));
F
freemine 已提交
326 327
    abort();
  }
328 329
#elif defined(SEM_USE_SEM)
  semaphore_destroy(sem_port, p->sem);
S
Shengliang Guan 已提交
330 331
#else   // SEM_USE_PTHREAD
#endif  // SEM_USE_PTHREAD
F
freemine 已提交
332 333

  p->valid = 0;
wafwerar's avatar
wafwerar 已提交
334
  taosMemoryFree(p);
F
freemine 已提交
335 336

  *sem = NULL;
S
slguan 已提交
337 338
  return 0;
}
F
freemine 已提交
339

wafwerar's avatar
wafwerar 已提交
340
bool taosCheckPthreadValid(TdThread thread) {
F
freemine 已提交
341
  uint64_t id = 0;
wafwerar's avatar
wafwerar 已提交
342
  int      r = TdThreadhreadid_np(thread, &id);
F
freemine 已提交
343 344 345 346
  return r ? false : true;
}

int64_t taosGetSelfPthreadId() {
347
  uint64_t id;
wafwerar's avatar
wafwerar 已提交
348
  TdThreadhreadid_np(0, &id);
S
Shengliang Guan 已提交
349
  return (int64_t)id;
F
freemine 已提交
350 351
}

wafwerar's avatar
wafwerar 已提交
352
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
F
freemine 已提交
353

wafwerar's avatar
wafwerar 已提交
354
void taosResetPthread(TdThread *thread) { *thread = NULL; }
F
freemine 已提交
355

wafwerar's avatar
wafwerar 已提交
356
bool taosComparePthread(TdThread first, TdThread second) { return taosThreadEqual(first, second) ? true : false; }
F
freemine 已提交
357

S
Shengliang Guan 已提交
358
int32_t taosGetPId() { return (int32_t)getpid(); }
F
freemine 已提交
359

360
int32_t taosGetAppName(char *name, int32_t *len) {
S
Shengliang Guan 已提交
361
  char buf[PATH_MAX + 1];
F
freemine 已提交
362
  buf[0] = '\0';
S
Shengliang Guan 已提交
363
  proc_name(getpid(), buf, sizeof(buf) - 1);
F
freemine 已提交
364 365 366 367 368 369 370
  buf[PATH_MAX] = '\0';
  size_t n = strlen(buf);
  if (len) *len = n;
  if (name) strcpy(name, buf);
  return 0;
}

S
Shengliang Guan 已提交
371 372 373 374 375 376 377
#else

/*
 * linux implementation
 */

#include <sys/syscall.h>
S
Shengliang Guan 已提交
378
#include <unistd.h>
S
Shengliang Guan 已提交
379

wafwerar's avatar
wafwerar 已提交
380
bool taosCheckPthreadValid(TdThread thread) { return thread != 0; }
S
Shengliang Guan 已提交
381 382 383 384 385 386 387 388

int64_t taosGetSelfPthreadId() {
  static __thread int id = 0;
  if (id != 0) return id;
  id = syscall(SYS_gettid);
  return id;
}

wafwerar's avatar
wafwerar 已提交
389 390 391
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
void    taosResetPthread(TdThread* thread) { *thread = 0; }
bool    taosComparePthread(TdThread first, TdThread second) { return first == second; }
S
Shengliang Guan 已提交
392 393
int32_t taosGetPId() { return getpid(); }

394
int32_t taosGetAppName(char* name, int32_t* len) {
S
Shengliang Guan 已提交
395 396
  const char* self = "/proc/self/exe";
  char        path[PATH_MAX] = {0};
F
freemine 已提交
397

S
Shengliang Guan 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
  if (readlink(self, path, PATH_MAX) <= 0) {
    return -1;
  }

  path[PATH_MAX - 1] = 0;
  char* end = strrchr(path, '/');
  if (end == NULL) {
    return -1;
  }

  ++end;

  strcpy(name, end);

  if (len != NULL) {
    *len = strlen(name);
  }

  return 0;
}

int32_t tsem_wait(tsem_t* sem) {
  int ret = 0;
  do {
    ret = sem_wait(sem);
  } while (ret != 0 && errno == EINTR);
  return ret;
}

#endif