提交 f9b19061 编写于 作者: H huolibo

feat(driver): jdbc add tmq seek function

上级 dd131ce1
......@@ -166,6 +166,12 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *, jobject, jlong, jlong,
jobject, jobject);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *, jobject, jlong, jstring, jint,
jlong);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
jstring, jobject);
#ifdef __cplusplus
}
#endif
......
......@@ -17,9 +17,16 @@
#include "jniCommon.h"
#include "taos.h"
int __init_tmq = 0;
int __init_tmq = 0;
jmethodID g_offsetCallback;
jclass g_assignmentClass;
jmethodID g_assignmentConstructor;
jmethodID g_assignmentSetVgId;
jmethodID g_assignmentSetCurrentOffset;
jmethodID g_assignmentSetBegin;
jmethodID g_assignmentSetEnd;
void tmqGlobalMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
......@@ -46,6 +53,38 @@ void tmqGlobalMethod(JNIEnv *env) {
jniDebug("tmq method register finished");
}
int __init_assignment = 0;
void tmqAssignmentMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init_assignment, 0, 1)) {
case 0:
break;
case 1:
do {
taosMsleep(0);
} while (atomic_load_32(&__init_assignment) == 1);
case 2:
return;
}
if (g_vm == NULL) {
(*env)->GetJavaVM(env, &g_vm);
}
jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment");
g_assignmentClass = (*env)->NewGlobalRef(env, assignment);
g_assignmentConstructor = (*env)->GetMethodID(env, g_assignmentClass, "<init>", "()V");
g_assignmentSetVgId = (*env)->GetMethodID(env, g_assignmentClass, "setVgId", "(I)V"); // int
g_assignmentSetCurrentOffset = (*env)->GetMethodID(env, g_assignmentClass, "setCurrentOffset", "(J)V"); // long
g_assignmentSetBegin = (*env)->GetMethodID(env, g_assignmentClass, "setBegin", "(J)V"); // long
g_assignmentSetEnd = (*env)->GetMethodID(env, g_assignmentClass, "setEnd", "(J)V"); // long
(*env)->DeleteLocalRef(env, assignment);
atomic_store_32(&__init_assignment, 2);
jniDebug("tmq method assignment finished");
}
// deprecated
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
JNIEnv *env = NULL;
......@@ -266,8 +305,9 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
tmq_commit_async(tmq, res, commit_cb, consumer);
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
jlong jres, jobject offset) {
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jlong jres,
jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
......@@ -369,7 +409,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
jniDebug("jobj:%p, conn:%p, resultset:%p, no data to retrieve", jobj, tscon, (void *)res);
return JNI_FETCH_END;
} else {
jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code, taos_errstr(tres));
jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code,
taos_errstr(tres));
return JNI_RESULT_SET_NULL;
}
}
......@@ -399,3 +440,72 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint partition,
jlong offset) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int32_t res = tmq_offset_seek(tmq, topicName, partition, offset);
if (res != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, tmq seek error, code:%d, msg:%s", jobj, res, tmq_err2str(res));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jint)res;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jobject jarrayList) {
tmqAssignmentMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
tmq_topic_assignment *pAssign = NULL;
int32_t numOfAssignment = 0;
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
if (res != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
tmq_err2str(res));
taosMemoryFree(pAssign);
return (jint)res;
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
for (int i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment assignment = pAssign[i];
jobject jassignment = (*env)->NewObject(env, g_assignmentClass, g_assignmentConstructor);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetVgId, assignment.vgId);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end);
(*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment);
}
taosMemoryFree(pAssign);
return JNI_SUCCESS;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册