根据官方教程
Quickstart: Upload, download, and list blobs withPython,如下所示,如果尚未注册Data Lake
Storage上的多协议访问的公共预览,则不能直接使用Python的Azure存储SDK在Azure
Data Lake Store Gen 2中进行任何操作。
注意
仅当您在Data Lake Storage上注册多协议访问的公共预览时,本文中介绍的功能才可用于具有分层名称空间的帐户。要查看限制,请参阅已知问题文章。
因此,将数据上传到ADLS Gen2的唯一解决方案是使用ADLS Gen2的REST API,请参阅其参考
Azure Data Lake StoreREST API。
这是我的示例代码,可以使用Python将数据上传到ADLS Gen2,并且工作正常。
import requestsimport jsondef auth(tenant_id, client_id, client_secret): print('auth') auth_headers = { "Content-Type": "application/x-www-form-urlenpred" } auth_body = { "client_id": client_id, "client_secret": client_secret, "scope" : "https://storage.azure.com/.default", "grant_type" : "client_credentials" } resp = requests.post(f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", headers=auth_headers, data=auth_body) return (resp.status_pre, json.loads(resp.text))def mkfs(account_name, fs_name, access_token): print('mkfs') fs_headers = { "Authorization": f"Bearer {access_token}" } resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}?resource=filesystem", headers=fs_headers) return (resp.status_pre, resp.text)def mkdir(account_name, fs_name, dir_name, access_token): print('mkdir') dir_headers = { "Authorization": f"Bearer {access_token}" } resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}?resource=directory", headers=dir_headers) return (resp.status_pre, resp.text)def touch_file(account_name, fs_name, dir_name, file_name, access_token): print('touch_file') touch_file_headers = { "Authorization": f"Bearer {access_token}" } resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}/{file_name}?resource=file", headers=touch_file_headers) return (resp.status_pre, resp.text)def append_file(account_name, fs_name, path, content, position, access_token): print('append_file') append_file_headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "text/plain", "Content-Length": f"{len(content)}" } resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=append&position={position}", headers=append_file_headers, data=content) return (resp.status_pre, resp.text)def flush_file(account_name, fs_name, path, position, access_token): print('flush_file') flush_file_headers = { "Authorization": f"Bearer {access_token}" } resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=flush&position={position}", headers=flush_file_headers) return (resp.status_pre, resp.text)def mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token): print('mkfile') status_pre, result = touch_file(account_name, fs_name, dir_name, file_name, access_token) if status_pre == 201: with open(local_file_name, 'rb') as local_file: path = f"{dir_name}/{file_name}" content = local_file.read() position = 0 append_file(account_name, fs_name, path, content, position, access_token) position = len(content) flush_file(account_name, fs_name, path, position, access_token) else: print(result)if __name__ == '__main__': tenant_id = '<your tenant id>' client_id = '<your client id>' client_secret = '<your client secret>' account_name = '<your adls account name>' fs_name = '<your filesystem name>' dir_name = '<your directory name>' file_name = '<your file name>' local_file_name = '<your local file name>' # Acquire an Access token auth_status_pre, auth_result = auth(tenant_id, client_id, client_secret) access_token = auth_status_pre == 200 and auth_result['access_token'] or '' print(access_token) # Create a filesystem mkfs_status_pre, mkfs_result = mkfs(account_name, fs_name, access_token) print(mkfs_status_pre, mkfs_result) # Create a directory mkdir_status_pre, mkdir_result = mkdir(account_name, fs_name, dir_name, access_token) print(mkdir_status_pre, mkdir_result) # Create a file from local file mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token)希望能帮助到你。



