I am trying to utilize the MSGraph API in Python in order to move Sharepoint files around based on some input. I currently have the download part of the API settled, but moving the file does not work for some reason. Here’s my code:
async def _move_sharepoint_file(self, url, site_name, src_drive_path, file_name, dst_site_name, dst_drive_path, new_file_name=None):
# Get source site and its drive items
sites = await self.user_client.sites.with_url(url).get()
for site in sites.value:
if site.name == site_name:
src_site_id = site.id
break
else:
raise ValueError(f"Source site with name '{site_name}' not found")
# Get destination site
if site_name == dst_site_name:
dst_site_id = src_site_id
else:
for site in sites.value:
if site.name == dst_site_name:
dst_site_id = site.id
break
else:
raise ValueError(f"Destination site with name '{dst_site_name}' not found")
# Find the source item
src_item_id = await self.get_item_id(src_site_id, src_drive_path, file_name)
if src_item_id is None:
raise ValueError(f"Item with name '{file_name}' in path '{src_drive_path}' not found")
query_params = ItemsRequestBuilder.ItemsRequestBuilderGetQueryParameters(
filter = True
)
request_configuration = ItemsRequestBuilder.ItemsRequestBuilderGetRequestConfiguration(
query_parameters = query_params,
)
path_list = dst_drive_path.replace("\", "/").split('/')
drive_items = await self.user_client.drives.by_drive_id(dst_site_id).items.get(request_configuration=request_configuration)
drive_items = drive_items.value
partial_dir = ""
print(f"path_list: {path_list}")
for dir in path_list:
if len(path_list) == 1 and path_list[0] == "":
correct_drive_items = drive_items
break
else:
partial_dir += f"/{dir}"
print(f"partial_dir: {partial_dir}") if self.verbose else None
[print(f"drive: {drive.name}") for drive in drive_items] if self.verbose else None
print() if self.verbose else None
for drive_item in drive_items:
if drive_item.name == dir and hasattr(drive_item, 'folder'): # Check if the item has 'folder' attribute
break
else:
raise ValueError(f"Folder with name '{partial_dir}' not found")
drive_id = drive_item.id # Assume this item is the folder
print(f"Selected Folder Name: {drive_item.name}, ID: {drive_id}")
dst_folder_id = drive_id #await self.get_item_id(dst_site_id, dst_drive_path, None)
# Assuming you pass None for file_name to get the folder ID
if dst_folder_id is None:
raise ValueError(f"Destination path '{dst_drive_path}' not found")
# Prepare the request body
request_body = DriveItem(
parent_reference=ItemReference(id=dst_folder_id),
name=new_file_name if new_file_name else file_name
)
# Move the item by updating its parent reference
result = await self.user_client.drives.by_drive_id(src_site_id).items.by_drive_item_id(src_item_id).patch(request_body)
if result.is_success:
print("File moved successfully.")
return result.content
else:
print(f"Failed to move file. Status code: {result.status_code}")
return None
When trying to run this code, I eventually receive the error (blurred out the request id’s):
msgraph.generated.models.o_data_errors.o_data_error.ODataError:
APIError
Code: 403
message: None
error: MainError(additional_data={}, code=’accessDenied’, details=None, inner_error=InnerError(additional_data={}, client_request_id=’XXXXX’, date=DateTime(2024, 5, 14, 7, 6, 52, tzinfo=Timezone(‘UTC’)), odata_type=None, request_id=’XXXX’), message=’Access denied’, target=None)
Please note that I do have the right permissions to the Sharepoint sites I am trying to access. Also we have the correct rights (file.readwrite.all).
My colleague is the one who wrote the download part, but I was given the task to extend this and add a move function. Here is the download function that actually does work:
async def _download_sharepoint_file(self, url, site_name, drive_path, file_names, dst_path=None):
# Deze functie aanroepen werkt niet met asyncio, dus die heb ik gekopieerd en geplakt
# drive = await self.get_sharepoint_site_drive_contents(url, site_name, drive_path)
sites = await self.user_client.sites.with_url(url).get()
print(f"Sites:") if self.verbose else None
[print(f"site: {site.name}") for site in sites.value] if self.verbose else None
print() if self.verbose else None
for site in sites.value:
if site.name == site_name:
break
else:
raise ValueError(f"Site with name '{site_name}' not found")
site_id = site.id
query_params = ItemsRequestBuilder.ItemsRequestBuilderGetQueryParameters(
filter = True
)
request_configuration = ItemsRequestBuilder.ItemsRequestBuilderGetRequestConfiguration(
query_parameters = query_params,
)
path_list = drive_path.replace("\", "/").split('/')
print(path_list)
drive_items = await self.user_client.drives.by_drive_id(site_id).items.get(request_configuration=request_configuration)
drive_items = drive_items.value
partial_dir = ""
print(f"path_list: {path_list}")
for dir in path_list:
if len(path_list) == 1 and path_list[0] == "":
correct_drive_items = drive_items
break
else:
partial_dir += f"/{dir}"
print(f"partial_dir: {partial_dir}") if self.verbose else None
[print(f"drive: {drive.name}") for drive in drive_items] if self.verbose else None
print() if self.verbose else None
for drive_item in drive_items:
if drive_item.name == dir:
break
else:
raise ValueError(f"Drive with name '{partial_dir}' not found")
drive_id = drive_item.id
drive_items = await self.user_client.drives.by_drive_id(site_id).items.by_drive_item_id(drive_id).children.get()
drive_items = drive_items.value
correct_drive_items = []
for drive_item in drive_items:
print(f"drive_item.name: {drive_item.name}") if self.verbose else None
parent_path = drive_item.parent_reference.path
print(f"drive_item.parent_reference.path: {drive_item.parent_reference.path}") if self.verbose else None
if parent_path.endswith(partial_dir):
correct_drive_items.append(drive_item)
print(f"YES") if self.verbose else None
drives = correct_drive_items
#### Einde van get_sharepoint_site_drive_contents
[print(f"drive_item.name: {drive_item.name}") for drive_item in drives] if self.verbose else None
if isinstance(file_names, str):
file_names = [file_names]
drives = [drive for drive in drives if drive.name in file_names]
if len(drives) == 0:
raise ValueError(f"Drive with name '{file_names}' not found")
for drive, file_name in zip(drives, file_names):
drive_id = drive.id
parent_drive_id = drive.parent_reference.drive_id
print(f"drive_id; {drive_id}") if self.verbose else None
print(f"parent_drive_id: {parent_drive_id}") if self.verbose else None
try:
result = await self.user_client.drives.by_drive_id(parent_drive_id).items.by_drive_item_id(drive_id).content.get()
except:
result = await self.user_client.drives.by_drive_id(parent_drive_id).items.by_drive_item_id(drive_id).get()
if not dst_path:
return result
_, ext_sp = os.path.splitext(drive.name)
_, ext = os.path.splitext(file_name)
assert ext_sp == ext, f"Extensions do not match: {ext_sp} != {ext}"
if isinstance(result, DriveItem):
return result
with open(f"{dst_path}/{file_name}", "wb") as f:
f.write(result)
print(f"File saved as: {dst_path}/{file_name}") if self.verbose else None
return result
I have been struggling with this since yesterday, but everything I try ends up with the same or some other error. I would appreciate it if anyone knows how to fix this error