栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

在Python数据流/ Apache Beam上启动CloudSQL代理

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

在Python数据流/ Apache Beam上启动CloudSQL代理

解决方法:

我终于找到了解决方法。我想到了通过CloudSQL实例的公共IP连接的想法。为此,您需要允许从每个IP连接到CloudSQL实例:

  1. 转到GCP中的CloudSQL实例的概述页面
  2. 点击
    Authorization
    标签
  3. 单击
    Add network
    并添加
    0.0.0.0/0
    !!这将允许每个IP地址连接到您的实例!!

为了增加流程的安全性,我使用了SSL密钥,并且只允许与实例的SSL连接:

  1. 点击
    SSL
    标签
  2. 单击
    Create a new certificate
    以为您的服务器创建SSL证书
  3. 单击
    Create a client certificate
    以为您的客户端创建SSL证书
  4. 单击
    Allow only SSL connections
    以拒绝所有无SSL连接尝试

之后,我将证书存储在Google Cloud Storage存储桶中并加载它们,然后在Dataflow作业中进行连接,即:

import psycopg2import psycopg2.extensionsimport osimport statfrom google.cloud import storage# Function to wait for open connection when processing paralleldef wait(conn):    while 1:        state = conn.poll()        if state == psycopg2.extensions.POLL_OK: break        elif state == psycopg2.extensions.POLL_WRITE: pass select.select([], [conn.fileno()], [])        elif state == psycopg2.extensions.POLL_READ: pass select.select([conn.fileno()], [], [])        else: raise psycopg2.OperationalError("poll() returned %s" % state)# Function which returns a connection which can be used for queriesdef connect_to_db(host, hostaddr, dbname, user, password, sslmode = 'verify-full'):    # Get keys from GCS    client = storage.Client()    bucket = client.get_bucket(<YOUR_BUCKET_NAME>)    bucket.get_blob('PATH_TO/server-ca.pem').download_to_filename('server-ca.pem')    bucket.get_blob('PATH_TO/client-key.pem').download_to_filename('client-key.pem')    os.chmod("client-key.pem", stat.S_IRWXU)    bucket.get_blob('PATH_TO/client-cert.pem').download_to_filename('client-cert.pem')    sslrootcert = 'server-ca.pem'    sslkey = 'client-key.pem'    sslcert = 'client-cert.pem'    con = psycopg2.connect(        host = host,        hostaddr = hostaddr,        dbname = dbname,        user = user,        password = password,        sslmode=sslmode,        sslrootcert = sslrootcert,        sslcert = sslcert,        sslkey = sslkey)    return con

然后,我在自定义中使用这些功能

ParDo
来执行查询。
最小示例:

import apache_beam as beamclass ReadSQLTableNames(beam.DoFn):    '''    parDo class to get all table names of a given cloudSQL database.    It will return each table name.    '''    def __init__(self, host, hostaddr, dbname, username, password):        super(ReadSQLTableNames, self).__init__()        self.host = host        self.hostaddr = hostaddr        self.dbname = dbname        self.username = username        self.password = password    def process(self, element):        # Connect do database        con = connect_to_db(host = self.host, hostaddr = self.hostaddr, dbname = self.dbname, user = self.username, password = self.password)        # Wait for free connection        wait_select(con)        # Create cursor to query data        cur = con.cursor(cursor_factory=RealDictCursor)        # Get all table names        cur.execute(        """        SELECT        tablename as table        FROM pg_tables        WHERe schemaname = 'public'        """        )        table_names = cur.fetchall()        cur.close()        con.close()        for table_name in table_names: yield table_name["table"]

然后,管道的一部分可能看起来像这样:

# Current workaround to query all tables: # Create a dummy initiator PCollection with one elementinit = p        |'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])tables = init   |'Get table names' >> beam.ParDo(ReadSQLTableNames(    host = known_args.host,    hostaddr = known_args.hostaddr,    dbname = known_args.db_name,    username = known_args.user,    password = known_args.password))

我希望此解决方案可以帮助其他有类似问题的人



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652661.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号