logPostgreSQLsimplejsonrequestselasticsearch
pip install
logimport logging
# 修改类名
logger = logging.getLogger(__name__)
# 修改日志级别
logging.basicConfig(level= log_level) # log_level: DEBUG INFO WARNING ERROR
# 打印
logging.debug()
logging.info()
logging.warning()
logging.error('Protocol problem: %s', 'connection reset')
# 打印异常
try:
...
except Exception as e:
logger.exception(e)
raise
logging是python官方模块, https://docs.python.org/3/library/logging.html#logging.debug
PostgreSQL基本使用
import psycopg2
#1. 连接
try:
pg_db = psycopg2.connect(database=database,
user=username,
password=password,
host=host,
port=port,
options="-c search_path=" + schema) # search_path 是schema
pg_cursor = pg_db.cursor()
except Exception as e:
pg_db.rollback()
logger.exception(e)
raise
finally:
pg_cursor.close()
pg_db.close()
#2.游标对象
cur = conn.cursor()
#3.创建表
cur.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")
#4.带参插入
cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)",
(100, "abc'def"))
#5.查询
cur.execute("SELECt * FROM test;")
cur.fetchone() # (1, 100, "abc'def")
cur.fetchall() # [(1, 100, "abc'def"), (2, None, 'dada'), (3, 42, 'bar')]
#7.提交
conn.commit()
cur.close()
conn.close()
官方文档 https://www.psycopg.org/docs/usage.html
小技巧
# 批量插入
from psycopg2.extras import execute_batch
execute_batch(cur,
"INSERT INTO test (id, v1, v2) VALUES(%s,%s,%s)",
params_list) # [[1,2,3],[1,2,3]] or [(1,2,3),(1,2,3)]
# 字典插入
cur.execute("INSERT INTO test (num, data) VALUES (%(arg1)s, %(arg2)s)",
{"arg1":200, "arg2": "bcd"})
# 字典查询
dict_cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
dict_cur.execute("SELECt * FROM test")
#返回结果可做字典使用,可根据字段名取数据
rec = dict_cur.fetchone()
rec['id'] # 1
rec['num'] # 100
#也可以当做元祖
rec[1] # 100
快速助手文档: https://www.psycopg.org/docs/extras.html
simplejson
基本使用
import simplejson # 写入json文件 with f = open(path, 'w', encoding='UTF-8'): simplejson.dump(data_obj,f) # 生成json字符串 json_str = simplejson.dumps(data_obj)
注意:
simplejson.dump(obj, fp, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, encoding='utf-8', default=None, use_decimal=True, namedtuple_as_object=True, tuple_as_array=True, bigint_as_string=False, sort_keys=False, item_sort_key=None, for_json=None, ignore_nan=False, int_as_string_bitcount=None, iterable_as_array=False, **kw)
simplejson 只支持基本类型 (str, int, long, float, bool, None); 参数 use_decimal 默认为True,会自己解析 decimal.Decimal 类型; 参数 tuple_as_array 默认为True, tuple(和子类)将被编码为 JSON 数组;
其他类型的解析可通过参数 cls 来指定解析类,如下 :
# simplejson.JSonEncoder 是默认的解析类
class JsonFileTypeEncoder(simplejson.JSONEncoder):
def default(self, o): # 解析方法
if isinstance(o, tuple):
return o[0].strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(o, datetime):
return o.strftime("%Y-%m-%d")
elif isinstance(o, timedelta):
return str(o).split(".")[0]
else:
return simplejson.JSONEncoder.default(self, o) # 不需要做处理的类型就还是用默认解析类的解析方法
#使用
simplejson.dumps(data_obj, cls=JsonFileTypeEncoder)
官方文档 https://simplejson.readthedocs.io/en/latest/
requests
官方文档: https://docs.python-requests.org/zh_CN/latest/user/quickstart.html
引用: https://www.liaoxuefeng.com/wiki/1016959663602400/1183249464292448#0
pip install requests
import requests
r = requests.get('https://www.baidu.com/') # 百度首页
r.status_code # 200
r.text # 'nnnn...'
# 对于特定类型的响应,例如JSON,可以直接获取:
r = requests.get('https://query.yahooapis.com/v1/public/yql?q=select')
r.json() # {'query': {'count': 1, 'created': '2017-11-17T07:14:12Z', ...
#传参
r = requests.get('https://www.douban.com/search', params={'q': 'python', 'cat': '1001'})
r.url # 'https://www.douban.com/search?q=python&cat=1001'
# 传入HTTP Header时,我们传入一个dict作为headers参数:
r = requests.get('https://www.douban.com/', headers={'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit'})
r.text
# 'nnnn 豆瓣(手机版) ...'
#post请求传参
r = requests.post('https://accounts.douban.com/login', data={'form_email': 'abc@example.com', 'form_password': '123456'})
#requests默认使用application/x-www-form-urlencoded对POST数据编码。如果要传递JSON数据,可以直接传入json参数:
params = {'key': 'value'}
r = requests.post(url, json=params) # 内部自动序列化为JSON
# 类似的,上传文件需要更复杂的编码格式,但是requests把它简化成files参数:
upload_files = {'file': open('report.xls', 'rb')}
r = requests.post(url, files=upload_files)
#在读取文件时,注意务必使用'rb'即二进制模式读取,这样获取的bytes长度才是文件的长度。
#响应头
r.headers #{Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Content-Encoding': 'gzip', ...}
#请求头
r.request.headers #{Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Content-Encoding': 'gzip', ...}
#requests对cookie做了特殊处理,使得我们不必解析cookie就可以轻松获取指定的cookie:
r.cookies['ts'] # 'example_cookie_12345'
# 要在请求中传入cookie,只需准备一个dict传入cookies参数:
cs = {'token': '12345', 'status': 'working'}
r = requests.get(url, cookies=cs)
#最后,要指定超时,传入以秒为单位的timeout参数:
r = requests.get(url, timeout=2.5) # 2.5秒后超时
elasticsearch
pip3 install elasticsearch
from elasticsearch import helpers, Elasticsearch
# 连接es
es_db = Elasticsearch(hosts="http://xxxx:9200", http_auth=(username, password), timeout=10)
# 批量插入
# parallel_bulk 必须迭代插入结果才会执行
# chunk_size 一次性发送条数,默认500
for success, info in helpers.parallel_bulk(es_db, es_doc_list, chunk_size=5000):
if not success:
logger.error("es文档插入失败: %s ", info)
# 插入数据结构
es_doc_list =[ {
"_index": "",
"_id": "",
"_score": 100,
"_source": {"name":"" , "age": 10}
}
]
#当没有 _source 字段时,会自动弹出元数据字段, 其他字段再作为文档字段,所以也可以如下:
es_doc_list =[ {
"_index": "",
"_id": "",
"_score": 100,
"name":"" ,
"age": 10
}
]
# 官方文档中说也支持json字符串



