python查询mysql以及es,将数据写到文件中
import datetime
import json
import time
import pymysql
import requests
def select_active_users(start_time, end_time):
list = {}
# 创建数据库连接
conn = pymysql.connect(
# host='i9p6z45927c1tw849.mysql.rds.ops.console.avic-internal.com',
# port=3306,
# user="igmp",
# passwd="d",
# db="resource_admin"
host='10.2.1.5',
port=30306,
user="root",
passwd="",
db="koal_audit_dev"
)
print("mysql connect succ!")
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'SELECT user_id FROM user_login_status where last_time BETWEEN "%s" and "%s"'
sql = sql % (start_time, end_time)
cursor.execute(sql)
print(sql)
list = cursor.fetchall()
print("list", list)
return list
def get_result(now_time, user_id, _30_ago_es_time):
result_data = {}
# base_url = 'http://10.129.81.130:9200/idaas_log*/_search'
base_url = 'http://10.2.0.5:9200/system*/_search'
# query_bady='{"size": 10,"query": {"bool": {"must": [{"term": {"host_name": "localhost.localdomain"}}]}}}'
query_bady = '{"size":0,"query":{"bool":{"filter":[{"range":{"flink_time":{"gte":"2020-02-04T04:56:01.000Z","lte":"2021-12-04T12:56:01.000Z"}}}],"must":[{"term":{"host_name":"localhost.localdomain"}}]}},"aggs":{"app_aggs":{"terms":{"field":"uptime"}}}}'
# query_bady = '{"size":0,"query":{"bool":{"filter":[{"range":{"flink_time":{"gte":"%s","lte":"%s"}}}],"must":[{"term":{"user_id":"%s"}}]}},"aggs":{"app_aggs":{"terms":{"field":"app_id"}}}}'
# headers = {'content-type': 'application/json', 'Authorization': 'Basic ZWxhc3RpYzpLMGExQDc0MTA='}
headers = {'content-type': 'application/json', 'Authorization': 'Basic ZWxhc3RpYzoxMjM0NWE='}
# query_bady = query_bady % (_30_ago_es_time, now_time, user_id)
r = requests.get(url=base_url, data=query_bady, headers=headers)
print("r: ", r)
buckets_list = r.json()["aggregations"]["app_aggs"]["buckets"]
print("str: ", str(r.json()))
for i in range(len(buckets_list)):
key_str = buckets_list[i]['key']
result_data[key_str] = buckets_list[i]["doc_count"]
print("result_data: ", result_data)
return result_data
def write_result_csv():
days = 180
now_time = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.localtime())
print("now_time", now_time)
today_sql_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
_30_ago_sql_time = (datetime.datetime.now() - datetime.timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
_30_ago_es_time = (datetime.datetime.now() - datetime.timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
active_users = select_active_users(_30_ago_sql_time, today_sql_time)
result = {}
for i in range(len(active_users)):
user_id = active_users[i]["user_id"]
r = get_result(now_time, user_id, _30_ago_es_time)
result['user_id'] = user_id
result['data'] = json.dumps(r)
with open('user_access_app_result.txt', 'a', encoding='utf-8') as f:
f.write(json.dumps(result))
f.write("n")
result.clear()
time.sleep(1)
if __name__ == '__main__':
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("start: ", now_time)
write_result_csv()
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("end: ", now_time)