telemetry.py 6.9 KB
Newer Older
1 2 3 4
import taos
import sys
import time
import socket
wafwerar's avatar
wafwerar 已提交
5
# import pexpect
6 7 8 9 10
import os
import http.server
import gzip
import threading
import json
11
import pickle
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *

telemetryPort = '80'

#{
#	"instanceId":	"5cf4cd7a-acd4-43ba-8b0d-e84395b76a65",
#	"reportVersion":	1,
#	"os":	"Ubuntu 20.04.3 LTS",
#	"cpuModel":	"Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.40GHz",
#	"numOfCpu":	6,
#	"memory":	"65860292 kB",
#	"version":	"3.0.0.0",
#	"buildInfo":	"Built at 2022-05-07 14:09:02",
#	"gitInfo":	"2139ccceb0946cde86b6b553b11e338f1ba437e5",
#	"email":	"user@taosdata.com",
#	"numOfDnode":	1,
#	"numOfMnode":	1,
#	"numOfVgroup":	32,
#	"numOfDatabase":	2,
#	"numOfSuperTable":	0,
#	"numOfChildTable":	100,
#	"numOfColumn":	200,
#	"numOfPoint":	300,
#	"totalStorage":	400,
#	"compStorage":	500
#}

def telemetryInfoCheck(infoDict=''):
    if  "instanceId" not in infoDict or len(infoDict["instanceId"]) == 0:
        tdLog.exit("instanceId is null!")

    if "reportVersion" not in infoDict or infoDict["reportVersion"] != 1:
        tdLog.exit("reportVersion is null!")

    if "os" not in infoDict:
        tdLog.exit("os is null!")

    if "cpuModel" not in infoDict:
        tdLog.exit("cpuModel is null!")

    if "numOfCpu" not in infoDict or infoDict["numOfCpu"] == 0:
        tdLog.exit("numOfCpu is null!")

    if "memory" not in infoDict:
        tdLog.exit("memory is null!")

    if "version" not in infoDict:
        tdLog.exit("version is null!")

    if "buildInfo" not in infoDict:
        tdLog.exit("buildInfo is null!")

    if "gitInfo" not in infoDict:
        tdLog.exit("gitInfo is null!")

    if "email" not in infoDict:
        tdLog.exit("email is not exists!")

    if "numOfDnode" not in infoDict or infoDict["numOfDnode"] < 1:
        tdLog.exit("numOfDnode is null!")

    if "numOfMnode" not in infoDict or infoDict["numOfMnode"] < 1:
        tdLog.exit("numOfMnode is null!")

    if "numOfVgroup" not in infoDict or infoDict["numOfVgroup"] <= 0:
        tdLog.exit("numOfVgroup is null!")

    if "numOfDatabase" not in infoDict or infoDict["numOfDatabase"] <= 0:
        tdLog.exit("numOfDatabase is null!")

    if "numOfSuperTable" not in infoDict or infoDict["numOfSuperTable"] < 0:
        tdLog.exit("numOfSuperTable is null!")

    if "numOfChildTable" not in infoDict or infoDict["numOfChildTable"] < 0:
        tdLog.exit("numOfChildTable is null!")

    if "numOfColumn" not in infoDict or infoDict["numOfColumn"] < 0:
        tdLog.exit("numOfColumn is null!")

    if "numOfPoint" not in infoDict or infoDict["numOfPoint"] < 0:
        tdLog.exit("numOfPoint is null!")

    if "totalStorage" not in infoDict or infoDict["totalStorage"] < 0:
        tdLog.exit("totalStorage is null!")

    if "compStorage" not in infoDict or infoDict["compStorage"] < 0:
        tdLog.exit("compStorage is null!")

G
Ganlin Zhao 已提交
104 105

class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
106 107 108 109 110 111 112 113 114 115 116 117 118 119
    def do_GET(self):
        """
        process GET request
        """

    def do_POST(self):
        """
        process POST request
        """
        contentEncoding = self.headers["Content-Encoding"]

        if contentEncoding == 'gzip':
            req_body = self.rfile.read(int(self.headers["Content-Length"]))
            plainText = gzip.decompress(req_body).decode()
G
Ganlin Zhao 已提交
120
        else:
121 122 123 124 125
            plainText = self.rfile.read(int(self.headers["Content-Length"])).decode()

        print("monitor info:\n%s"%plainText)

        # 1. send response code and header
G
Ganlin Zhao 已提交
126
        self.send_response(200)
127 128
        self.send_header("Content-Type", "text/html; charset=utf-8")
        self.end_headers()
G
Ganlin Zhao 已提交
129

130 131
        # 2. send response content
        #self.wfile.write(("Hello World: " + req_body + "\n").encode("utf-8"))
G
Ganlin Zhao 已提交
132

133 134 135 136 137 138
        # 3. check request body info
        infoDict = json.loads(plainText)
        #print("================")
        #print(infoDict)
        telemetryInfoCheck(infoDict)

G
Ganlin Zhao 已提交
139
        # 4. shutdown the server and exit case
140
        assassin = threading.Thread(target=self.server.shutdown)
141 142 143 144 145 146
        assassin.daemon = True
        assassin.start()
        print ("==== shutdown http server ====")

class TDTestCase:
    hostname = socket.gethostname()
147 148 149 150 151 152
    if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
        try:
            config = eval(tdDnodes.dnodes[0].remoteIP)
            hostname = config["host"]
        except Exception:
            hostname = tdDnodes.dnodes[0].remoteIP
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
    serverPort = '7080'
    rpcDebugFlagVal = '143'
    clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
    clientCfgDict["serverPort"]    = serverPort
    clientCfgDict["firstEp"]       = hostname + ':' + serverPort
    clientCfgDict["secondEp"]      = hostname + ':' + serverPort
    clientCfgDict["rpcDebugFlag"]  = rpcDebugFlagVal
    clientCfgDict["fqdn"]          = hostname

    updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
    updatecfgDict["clientCfg"]  = clientCfgDict
    updatecfgDict["serverPort"] = serverPort
    updatecfgDict["firstEp"]    = hostname + ':' + serverPort
    updatecfgDict["secondEp"]   = hostname + ':' + serverPort
    updatecfgDict["fqdn"]       = hostname

    updatecfgDict["telemetryReporting"]       = '1'
    updatecfgDict["telemetryServer"]          = hostname
    updatecfgDict["telemetryPort"]            = telemetryPort
    updatecfgDict["telemetryInterval"]        = "3"

    print ("===================: ", updatecfgDict)

    def init(self, conn, logSql):
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor())
G
Ganlin Zhao 已提交
179

180 181 182 183 184 185 186
    def run(self):  # sourcery skip: extract-duplicate-method, remove-redundant-fstring
        tdSql.prepare()
        # time.sleep(2)
        vgroups = "30"
        sql = "create database db3 vgroups " + vgroups
        tdSql.query(sql)

187 188 189 190 191 192 193 194 195
        # create http server: bing ip/port , and  request processor
        if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
            RequestHandlerImplStr = base64.b64encode(pickle.dumps(RequestHandlerImpl)).decode()
            telemetryInfoCheckStr = base64.b64encode(pickle.dumps(telemetryInfoCheck)).decode()
            cmdStr = "import pickle\nimport http\ntelemetryInfoCheck=pickle.loads(base64.b64decode(\"%s\".encode()))\nRequestHandlerImpl=pickle.loads(base64.b64decode(\"%s\".encode()))\nhttp.server.HTTPServer((\"\", %d), RequestHandlerImpl).serve_forever()"%(telemetryInfoCheckStr,RequestHandlerImplStr,int(telemetryPort))
            tdDnodes.dnodes[0].remoteExec({}, cmdStr)
        else:
            serverAddress = ("", int(telemetryPort))
            http.server.HTTPServer(serverAddress, RequestHandlerImpl).serve_forever()
196 197 198 199 200 201 202 203 204 205 206 207

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())