解决方法:
我终于找到了解决方法。我想到了通过CloudSQL实例的公共IP连接的想法。为此,您需要允许从每个IP连接到CloudSQL实例:
- 转到GCP中的CloudSQL实例的概述页面
- 点击
Authorization
标签 - 单击
Add network
并添加0.0.0.0/0
( !!这将允许每个IP地址连接到您的实例!! )
为了增加流程的安全性,我使用了SSL密钥,并且只允许与实例的SSL连接:
- 点击
SSL
标签 - 单击
Create a new certificate
以为您的服务器创建SSL证书 - 单击
Create a client certificate
以为您的客户端创建SSL证书 - 单击
Allow only SSL connections
以拒绝所有无SSL连接尝试
之后,我将证书存储在Google Cloud Storage存储桶中并加载它们,然后在Dataflow作业中进行连接,即:
import psycopg2import psycopg2.extensionsimport osimport statfrom google.cloud import storage# Function to wait for open connection when processing paralleldef wait(conn): while 1: state = conn.poll() if state == psycopg2.extensions.POLL_OK: break elif state == psycopg2.extensions.POLL_WRITE: pass select.select([], [conn.fileno()], []) elif state == psycopg2.extensions.POLL_READ: pass select.select([conn.fileno()], [], []) else: raise psycopg2.OperationalError("poll() returned %s" % state)# Function which returns a connection which can be used for queriesdef connect_to_db(host, hostaddr, dbname, user, password, sslmode = 'verify-full'): # Get keys from GCS client = storage.Client() bucket = client.get_bucket(<YOUR_BUCKET_NAME>) bucket.get_blob('PATH_TO/server-ca.pem').download_to_filename('server-ca.pem') bucket.get_blob('PATH_TO/client-key.pem').download_to_filename('client-key.pem') os.chmod("client-key.pem", stat.S_IRWXU) bucket.get_blob('PATH_TO/client-cert.pem').download_to_filename('client-cert.pem') sslrootcert = 'server-ca.pem' sslkey = 'client-key.pem' sslcert = 'client-cert.pem' con = psycopg2.connect( host = host, hostaddr = hostaddr, dbname = dbname, user = user, password = password, sslmode=sslmode, sslrootcert = sslrootcert, sslcert = sslcert, sslkey = sslkey) return con然后,我在自定义中使用这些功能
ParDo来执行查询。
最小示例:
import apache_beam as beamclass ReadSQLTableNames(beam.DoFn): ''' parDo class to get all table names of a given cloudSQL database. It will return each table name. ''' def __init__(self, host, hostaddr, dbname, username, password): super(ReadSQLTableNames, self).__init__() self.host = host self.hostaddr = hostaddr self.dbname = dbname self.username = username self.password = password def process(self, element): # Connect do database con = connect_to_db(host = self.host, hostaddr = self.hostaddr, dbname = self.dbname, user = self.username, password = self.password) # Wait for free connection wait_select(con) # Create cursor to query data cur = con.cursor(cursor_factory=RealDictCursor) # Get all table names cur.execute( """ SELECT tablename as table FROM pg_tables WHERe schemaname = 'public' """ ) table_names = cur.fetchall() cur.close() con.close() for table_name in table_names: yield table_name["table"]
然后,管道的一部分可能看起来像这样:
# Current workaround to query all tables: # Create a dummy initiator PCollection with one elementinit = p |'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])tables = init |'Get table names' >> beam.ParDo(ReadSQLTableNames( host = known_args.host, hostaddr = known_args.hostaddr, dbname = known_args.db_name, username = known_args.user, password = known_args.password))
我希望此解决方案可以帮助其他有类似问题的人



