提交 a73d8a01 编写于 作者: O openeuler-ci-bot 提交者: Gitee

!213 exec: separate stdout and stderr in exec

Merge pull request !213 from lifeng_isula/fix_exec_stderr
......@@ -324,7 +324,8 @@ message RemoteExecRequest {
}
message RemoteExecResponse {
bytes stdout = 1;
bool finish = 2;
bytes stderr = 2;
bool finish = 3;
}
message AttachRequest {
......
......@@ -1004,7 +1004,12 @@ out:
if (stream_response.finish()) {
break;
}
std::cout << stream_response.stdout() << std::flush;
if (!stream_response.stdout().empty()) {
std::cout << stream_response.stdout() << std::flush;
}
if (!stream_response.stderr().empty()) {
std::cerr << stream_response.stderr() << std::flush;
}
}
write_task.stop();
stream->WritesDone();
......
......@@ -616,7 +616,7 @@ Status ContainerServiceImpl::Exec(ServerContext *context, const ExecRequest *req
return Status::CANCELLED;
}
ret = cb->container.exec(container_req, &container_res, -1, nullptr);
ret = cb->container.exec(container_req, &container_res, -1, nullptr, nullptr);
tret = exec_response_to_grpc(container_res, reply);
free_container_exec_request(container_req);
......@@ -629,7 +629,7 @@ Status ContainerServiceImpl::Exec(ServerContext *context, const ExecRequest *req
return Status::OK;
}
ssize_t WriteExecResponseToRemoteClient(void *context, const void *data, size_t len)
ssize_t WriteExecStdoutResponseToRemoteClient(void *context, const void *data, size_t len)
{
if (context == nullptr || data == nullptr || len == 0) {
return 0;
......@@ -644,6 +644,21 @@ ssize_t WriteExecResponseToRemoteClient(void *context, const void *data, size_t
return (ssize_t)len;
}
ssize_t WriteExecStderrResponseToRemoteClient(void *context, const void *data, size_t len)
{
if (context == nullptr || data == nullptr || len == 0) {
return 0;
}
auto stream = static_cast<ServerReaderWriter<RemoteExecResponse, RemoteExecRequest> *>(context);
RemoteExecResponse response;
response.set_stderr((char *)data, len);
if (!stream->Write(response)) {
ERROR("Failed to write request to grpc client");
return -1;
}
return (ssize_t)len;
}
class RemoteExecReceiveFromClientTask : public StoppableThread {
public:
RemoteExecReceiveFromClientTask() = default;
......@@ -720,11 +735,15 @@ Status ContainerServiceImpl::RemoteExec(ServerContext *context,
});
}
struct io_write_wrapper stringWriter = { 0 };
stringWriter.context = (void *)stream;
stringWriter.write_func = WriteExecResponseToRemoteClient;
stringWriter.close_func = nullptr;
(void)cb->container.exec(container_req, &container_res, read_pipe_fd[0], &stringWriter);
struct io_write_wrapper StdoutstringWriter = { 0 };
StdoutstringWriter.context = (void *)stream;
StdoutstringWriter.write_func = WriteExecStdoutResponseToRemoteClient;
StdoutstringWriter.close_func = nullptr;
struct io_write_wrapper StderrstringWriter = { 0 };
StderrstringWriter.context = (void *)stream;
StderrstringWriter.write_func = WriteExecStderrResponseToRemoteClient;
StderrstringWriter.close_func = nullptr;
(void)cb->container.exec(container_req, &container_res, read_pipe_fd[0], &StdoutstringWriter, &StderrstringWriter);
RemoteExecResponse finish_response;
finish_response.set_finish(true);
......
......@@ -945,7 +945,7 @@ static void rest_exec_cb(evhtp_request_t *req, void *arg)
goto out;
}
(void)cb->container.exec(crequest, &cresponse, -1, NULL);
(void)cb->container.exec(crequest, &cresponse, -1, NULL, NULL);
evhtp_send_exec_repsponse(req, cresponse, RESTFUL_RES_OK);
out:
......
......@@ -113,7 +113,7 @@ typedef struct {
int(*list)(const container_list_request *request, container_list_response **response);
int(*exec)(const container_exec_request *request, container_exec_response **response,
int stdinfd, struct io_write_wrapper *stdout);
int stdinfd, struct io_write_wrapper *stdout, struct io_write_wrapper *stderr);
int(*attach)(const container_attach_request *request, container_attach_response **response,
int stdinfd, struct io_write_wrapper *stdout, struct io_write_wrapper *stderr);
......
......@@ -1339,7 +1339,8 @@ void CRIRuntimeServiceImpl::ExecSync(const std::string &containerID,
const google::protobuf::RepeatedPtrField<std::string> &cmd, int64_t timeout,
runtime::v1alpha2::ExecSyncResponse *reply, Errors &error)
{
struct io_write_wrapper stringWriter = { 0 };
struct io_write_wrapper StdoutstringWriter = { 0 };
struct io_write_wrapper StderrstringWriter = { 0 };
if (m_cb == nullptr || m_cb->container.exec == nullptr) {
error.SetError("Unimplemented callback");
......@@ -1365,9 +1366,12 @@ void CRIRuntimeServiceImpl::ExecSync(const std::string &containerID,
goto cleanup;
}
stringWriter.context = (void *)reply->mutable_stdout();
stringWriter.write_func = WriteToString;
if (m_cb->container.exec(request, &response, -1, &stringWriter)) {
StdoutstringWriter.context = (void *)reply->mutable_stdout();
StdoutstringWriter.write_func = WriteToString;
StderrstringWriter.context = (void *)reply->mutable_stderr();
StderrstringWriter.write_func = WriteToString;
if (m_cb->container.exec(request, &response, -1, &StdoutstringWriter, &StderrstringWriter)) {
if (response != nullptr && response->errmsg != nullptr) {
error.SetError(response->errmsg);
} else {
......
......@@ -608,8 +608,8 @@ static int container_exec_cb_check(const container_exec_request *request, contai
}
static int exec_prepare_console(container_t *cont, const container_exec_request *request, int stdinfd,
struct io_write_wrapper *stdout_handler, char **fifos,
char **fifopath, int *sync_fd, pthread_t *thread_id)
struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler,
char **fifos, char **fifopath, int *sync_fd, pthread_t *thread_id)
{
int ret = 0;
const char *id = cont->common_config->id;
......@@ -629,7 +629,7 @@ static int exec_prepare_console(container_t *cont, const container_exec_request
goto out;
}
if (ready_copy_io_data(*sync_fd, false, request->stdin, request->stdout, request->stderr,
stdinfd, stdout_handler, NULL, (const char **)fifos, thread_id)) {
stdinfd, stdout_handler, stderr_handler, (const char **)fifos, thread_id)) {
ret = -1;
goto out;
}
......@@ -730,7 +730,7 @@ out:
}
static int container_exec_cb(const container_exec_request *request, container_exec_response **response,
int stdinfd, struct io_write_wrapper *stdout_handler)
int stdinfd, struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler)
{
int exit_code = 0;
int sync_fd = -1;
......@@ -802,7 +802,8 @@ static int container_exec_cb(const container_exec_request *request, container_ex
}
}
if (exec_prepare_console(cont, request, stdinfd, stdout_handler, fifos, &fifopath, &sync_fd, &thread_id)) {
if (exec_prepare_console(cont, request, stdinfd, stdout_handler, stderr_handler, fifos, &fifopath, &sync_fd,
&thread_id)) {
cc = ISULAD_ERR_EXEC;
goto pack_response;
}
......
......@@ -516,7 +516,8 @@ void *health_check_run(void *arg)
char **cmd_slice = NULL;
char output[REV_BUF_SIZE] = { 0 };
char timebuffer[TIME_STR_SIZE] = { 0 };
struct io_write_wrapper ctx = { 0 };
struct io_write_wrapper Stdoutctx = { 0 };
struct io_write_wrapper Stderrctx = { 0 };
container_t *cont = NULL;
service_callback_t *cb = NULL;
container_exec_request *container_req = NULL;
......@@ -559,7 +560,7 @@ void *health_check_run(void *arg)
container_req->tty = false;
container_req->attach_stdin = false;
container_req->attach_stdout = true;
container_req->attach_stderr = false;
container_req->attach_stderr = true;
container_req->timeout = timeout_with_default(config->health_check->timeout, DEFAULT_PROBE_TIMEOUT) / Time_Second;
container_req->container_id = util_strdup_s(cont->common_config->id);
container_req->argv = cmd_slice;
......@@ -575,10 +576,13 @@ void *health_check_run(void *arg)
}
result->start = util_strdup_s(timebuffer);
ctx.context = (void *)output;
ctx.write_func = write_to_string;
ctx.close_func = NULL;
ret = cb->container.exec(container_req, &container_res, -1, &ctx);
Stdoutctx.context = (void *)output;
Stdoutctx.write_func = write_to_string;
Stdoutctx.close_func = NULL;
Stderrctx.context = (void *)output;
Stderrctx.write_func = write_to_string;
Stderrctx.close_func = NULL;
ret = cb->container.exec(container_req, &container_res, -1, &Stdoutctx, &Stderrctx);
if (ret != 0) {
health_check_exec_failed_handle(container_res, result);
} else {
......
......@@ -48,7 +48,7 @@ int AttachServe::Execute(struct lws *wsi, const std::string &token,
}
struct io_write_wrapper stringWriter = { 0 };
stringWriter.context = (void *)wsi;
stringWriter.write_func = WsWriteToClient;
stringWriter.write_func = WsWriteStdoutToClient;
stringWriter.close_func = closeWsConnect;
container_req->attach_stderr = false;
int ret = cb->container.attach(container_req, &container_res,
......
......@@ -45,11 +45,14 @@ int ExecServe::Execute(struct lws *wsi, const std::string &token,
ERROR("Failed to transform grpc request!");
return -1;
}
struct io_write_wrapper stringWriter = { 0 };
stringWriter.context = (void *)wsi;
stringWriter.write_func = WsWriteToClient;
struct io_write_wrapper StdoutstringWriter = { 0 };
StdoutstringWriter.context = (void *)wsi;
StdoutstringWriter.write_func = WsWriteStdoutToClient;
struct io_write_wrapper StderrstringWriter = { 0 };
StderrstringWriter.context = (void *)wsi;
StderrstringWriter.write_func = WsWriteStderrToClient;
int ret = cb->container.exec(container_req, &container_res,
container_req->attach_stdin ? read_pipe_fd : -1, &stringWriter);
container_req->attach_stdin ? read_pipe_fd : -1, &StdoutstringWriter, &StderrstringWriter);
if (ret != 0) {
std::string message;
if (container_res != nullptr && container_res->errmsg != nullptr) {
......@@ -57,11 +60,11 @@ int ExecServe::Execute(struct lws *wsi, const std::string &token,
} else {
message = "Failed to call exec container callback. ";
}
WsWriteToClient(wsi, message.c_str(), message.length());
WsWriteStdoutToClient(wsi, message.c_str(), message.length());
}
if (container_res != nullptr && container_res->exit_code != 0) {
std::string exit_info = "Exit code :" + std::to_string((int)container_res->exit_code) + "\n";
WsWriteToClient(wsi, exit_info.c_str(), exit_info.length());
WsWriteStdoutToClient(wsi, exit_info.c_str(), exit_info.length());
}
free_container_exec_request(container_req);
free_container_exec_response(container_res);
......
......@@ -410,7 +410,7 @@ void WebsocketServer::Wait()
}
ssize_t WsWriteToClient(void *context, const void *data, size_t len)
ssize_t WsWriteStdoutToClient(void *context, const void *data, size_t len)
{
const int RETRIES = 10;
const int CHECK_PERIOD_SECOND = 1;
......@@ -456,6 +456,52 @@ ssize_t WsWriteToClient(void *context, const void *data, size_t len)
return (ssize_t)len;
}
ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len)
{
const int RETRIES = 10;
const int CHECK_PERIOD_SECOND = 1;
const int TRIGGER_PERIOD_MS = 100;
struct lws *wsi = static_cast<struct lws *>(context);
WebsocketServer *server = WebsocketServer::GetInstance();
server->LockAllWsSession();
auto it = server->GetWsisData().find(wsi);
if (it == server->GetWsisData().end()) {
ERROR("invalid session!");
server->UnlockAllWsSession();
return 0;
}
it->second.SetProcessingStatus(true);
server->UnlockAllWsSession();
server->SetLwsSendedFlag(wsi, false);
it->second.buf_mutex->lock();
auto &buf = it->second.buf;
// Determine if it is standard output channel or error channel?
(void)memset(buf, 0, LWS_PRE + MAX_MSG_BUFFER_SIZE + 1);
buf[LWS_PRE] = STDERRCHANNEL;
(void)memcpy(&buf[LWS_PRE + 1], (void *)data, len);
auto start = std::chrono::system_clock::now();
lws_callback_on_writable(wsi);
it->second.buf_mutex->unlock();
int count = 0;
while (!it->second.sended && count < RETRIES) {
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
double spend_time = static_cast<double>(duration.count()) * std::chrono::microseconds::period::num /
std::chrono::microseconds::period::den;
if (spend_time > CHECK_PERIOD_SECOND) {
lws_callback_on_writable(wsi);
std::this_thread::sleep_for(std::chrono::milliseconds(TRIGGER_PERIOD_MS));
start = std::chrono::system_clock::now();
count++;
}
std::this_thread::sleep_for(std::chrono::milliseconds(TRIGGER_PERIOD_MS));
}
it->second.SetProcessingStatus(false);
return (ssize_t)len;
}
int closeWsConnect(void *context, char **err)
{
(void)err;
......
......@@ -119,7 +119,8 @@ private:
int m_listenPort;
};
ssize_t WsWriteToClient(void *context, const void *data, size_t len);
ssize_t WsWriteStdoutToClient(void *context, const void *data, size_t len);
ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len);
int closeWsConnect(void *context, char **err);
#endif /* __WEBSOCKET_SERVER_H_ */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册