/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include namespace apache { namespace thrift { namespace transport { TOverlappedWorkItem::TOverlappedWorkItem() : SLIST_ENTRY(), action(UNKNOWN), h(INVALID_HANDLE_VALUE), buffer(NULL), buffer_len(0), overlap(), last_error(0), success(TRUE) { } void TOverlappedWorkItem::reset(uint8_t* buf, uint32_t len, HANDLE event) { memset(&overlap, 0, sizeof(overlap)); overlap.hEvent = event; buffer = buf; buffer_len = len; last_error = 0; success = FALSE; } uint32_t TOverlappedWorkItem::overlappedResults(bool signal_failure) { DWORD bytes = 0; BOOL result = ::GetOverlappedResult(h, &overlap, &bytes, TRUE); if (signal_failure && !result) // get overlapped error case { GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError()); throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed"); } return bytes; } bool TOverlappedWorkItem::process() { BOOST_SCOPE_EXIT((&doneSubmittingEvent)) { SetEvent(doneSubmittingEvent.h); } BOOST_SCOPE_EXIT_END switch (action) { case (CONNECT): success = ::ConnectNamedPipe(h, &overlap); if (success == FALSE) last_error = ::GetLastError(); return true; case (READ): success = ::ReadFile(h, buffer, buffer_len, NULL, &overlap); if (success == FALSE) last_error = ::GetLastError(); return true; case (CANCELIO): success = ::CancelIo(h); if (success == FALSE) last_error = ::GetLastError(); return true; case (STOP): default: return false; } } void TOverlappedSubmissionThread::addWorkItem(TOverlappedWorkItem* item) { InterlockedPushEntrySList(&workList_, item); SetEvent(workAvailableEvent_.h); WaitForSingleObject(item->doneSubmittingEvent.h, INFINITE); } TOverlappedSubmissionThread* TOverlappedSubmissionThread::acquire_instance() { TAutoCrit lock(instanceGuard_); if (instance_ == NULL) { assert(instanceRefCount_ == 0); instance_ = new TOverlappedSubmissionThread; } ++instanceRefCount_; return instance_; } void TOverlappedSubmissionThread::release_instance() { TAutoCrit lock(instanceGuard_); if (--instanceRefCount_ == 0) { delete instance_; instance_ = NULL; } } TOverlappedSubmissionThread::TOverlappedSubmissionThread() { stopItem_.action = TOverlappedWorkItem::STOP; InitializeSListHead(&workList_); thread_ = (HANDLE)_beginthreadex(NULL, 0, thread_proc, this, 0, NULL); if (thread_ == 0) { GlobalOutput.perror("TOverlappedSubmissionThread unable to create thread, errno=", errno); throw TTransportException(TTransportException::NOT_OPEN, " TOverlappedSubmissionThread unable to create thread"); } } TOverlappedSubmissionThread::~TOverlappedSubmissionThread() { addWorkItem(&stopItem_); ::WaitForSingleObject(thread_, INFINITE); CloseHandle(thread_); } void TOverlappedSubmissionThread::run() { for (;;) { WaitForSingleObject(workAvailableEvent_.h, INFINITE); // todo check result SLIST_ENTRY* entry = NULL; while ((entry = InterlockedPopEntrySList(&workList_)) != NULL) { TOverlappedWorkItem& item = *static_cast(entry); if (!item.process()) return; } } } unsigned __stdcall TOverlappedSubmissionThread::thread_proc(void* addr) { static_cast(addr)->run(); return 0; } TCriticalSection TOverlappedSubmissionThread::instanceGuard_; TOverlappedSubmissionThread* TOverlappedSubmissionThread::instance_; uint32_t TOverlappedSubmissionThread::instanceRefCount_ = 0; } } } // apach::thrift::transport