Download Export File
The Download Export File API allows you to retrieve the exported data file after a successful export execution. This endpoint downloads the file generated by the export task.
Overview
After an export task completes successfully, use this endpoint to download the generated file. The file format depends on the export configuration (CSV, TXT, XML, etc.).
Endpoint Details
URL Structure
Full URL Example
Method
GET
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
TASK_ID |
Path | Yes | Task ID from export execution |
Request Examples
Using curl
# Download to current directory
curl GET 'https://demo.epmwarecloud.com/service/api/export/download_file/244596' \
-H 'Authorization: Token 15388ad5-c9af-4cf3-af47-8021c1ab3fb7' \
-o export_244596.txt
# Download with original filename
curl -OJ 'https://demo.epmwarecloud.com/service/api/export/download_file/244596' \
-H 'Authorization: Token 15388ad5-c9af-4cf3-af47-8021c1ab3fb7'
Using Python
import requests
import os
from urllib.parse import urlparse
import re
def download_export_file(base_url, token, task_id, output_dir=None):
"""
Download export file
Args:
base_url: EPMware base URL
token: Authentication token
task_id: Task ID from export execution
output_dir: Directory to save file (optional)
Returns:
str: Path to downloaded file
"""
url = f"{base_url}/service/api/export/download_file/{task_id}"
headers = {
'Authorization': f'Token {token}'
}
# Stream download for large files
response = requests.get(url, headers=headers, stream=True)
if response.status_code != 200:
raise Exception(f"Download failed: {response.status_code} - {response.text}")
# Determine filename
filename = f"export_{task_id}.txt" # Default
# Try to get filename from Content-Disposition header
content_disposition = response.headers.get('Content-Disposition')
if content_disposition:
match = re.search(r'filename="?([^";\n]+)"?', content_disposition)
if match:
filename = match.group(1)
# Determine output path
if output_dir:
os.makedirs(output_dir, exist_ok=True)
filepath = os.path.join(output_dir, filename)
else:
filepath = filename
# Download file
total_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
with open(filepath, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
downloaded += len(chunk)
# Progress indicator
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"Downloading: {percent:.1f}%", end='\r')
print(f"\n✅ Downloaded: {filepath}")
# Get file size
file_size = os.path.getsize(filepath)
print(f"📊 File size: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)")
return filepath
# Usage
file_path = download_export_file(
base_url="https://demo.epmwarecloud.com",
token="15388ad5-c9af-4cf3-af47-8021c1ab3fb7",
task_id="244596",
output_dir="/exports/daily/"
)
Using PowerShell
function Download-EPMwareExportFile {
param(
[Parameter(Mandatory=$true)]
[string]$BaseUrl,
[Parameter(Mandatory=$true)]
[string]$Token,
[Parameter(Mandatory=$true)]
[string]$TaskId,
[string]$OutputPath = "."
)
$url = "$BaseUrl/service/api/export/download_file/$TaskId"
$headers = @{
"Authorization" = "Token $Token"
}
# Ensure output directory exists
if (!(Test-Path $OutputPath)) {
New-Item -ItemType Directory -Path $OutputPath -Force | Out-Null
}
$outputFile = Join-Path $OutputPath "export_$TaskId.txt"
try {
# Download file
Write-Host "📥 Downloading export file..." -ForegroundColor Cyan
$progressPreference = 'Continue'
Invoke-WebRequest `
-Uri $url `
-Headers $headers `
-OutFile $outputFile `
-UseBasicParsing
# Get file info
$fileInfo = Get-Item $outputFile
$sizeMB = [math]::Round($fileInfo.Length / 1MB, 2)
Write-Host "✅ File downloaded successfully!" -ForegroundColor Green
Write-Host "📠Location: $outputFile"
Write-Host "📊 Size: $sizeMB MB"
return $outputFile
}
catch {
Write-Host "⌠Download failed: $_" -ForegroundColor Red
throw
}
}
# Usage
$file = Download-EPMwareExportFile `
-BaseUrl "https://demo.epmwarecloud.com" `
-Token "15388ad5-c9af-4cf3-af47-8021c1ab3fb7" `
-TaskId "244596" `
-OutputPath "C:\Exports"
Response Format
Headers
| Header | Description | Example |
|---|---|---|
Content-Type |
File MIME type | text/csv, application/octet-stream |
Content-Length |
File size in bytes | 1524367 |
Content-Disposition |
Filename information | attachment; filename="PBCS_export_20250115.csv" |
Body
The response body contains the raw file data. The format depends on the export configuration: - CSV files for tabular data - TXT files for delimited data - XML files for structured data - ZIP files for compressed exports
Complete Export Workflow
import time
from datetime import datetime
class ExportManager:
"""Manage complete export workflow"""
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
self.headers = {
'Authorization': f'Token {token}',
'Content-Type': 'application/json'
}
def run_export(self, profile_name):
"""Start export execution"""
url = f"{self.base_url}/service/api/export/run"
data = {'name': profile_name}
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()['taskId']
def check_status(self, task_id):
"""Check export task status"""
url = f"{self.base_url}/service/api/task/get_status/{task_id}"
response = requests.get(url, headers={'Authorization': f'Token {self.token}'})
response.raise_for_status()
return response.json()
def download_file(self, task_id, output_dir):
"""Download export file"""
url = f"{self.base_url}/service/api/export/download_file/{task_id}"
response = requests.get(url, headers={'Authorization': f'Token {self.token}'},
stream=True)
response.raise_for_status()
# Generate filename with timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"export_{task_id}_{timestamp}.txt"
filepath = os.path.join(output_dir, filename)
# Download with progress
total_size = int(response.headers.get('Content-Length', 0))
with open(filepath, 'wb') as file:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
bars = 'â–ˆ' * int(progress / 5) + 'â–‘' * (20 - int(progress / 5))
print(f'\r[{bars}] {progress:.1f}%', end='', flush=True)
print() # New line after progress bar
return filepath
def execute_export_workflow(self, profile_name, output_dir, timeout_minutes=30):
"""Execute complete export workflow"""
print(f"🚀 Starting export: {profile_name}")
print("=" * 50)
# Start export
task_id = self.run_export(profile_name)
print(f"📋 Task ID: {task_id}")
# Monitor progress
start_time = time.time()
timeout_seconds = timeout_minutes * 60
while time.time() - start_time < timeout_seconds:
status = self.check_status(task_id)
print(f"â³ Status: {status['status']} - Progress: {status['percentCompleted']}%")
if status['status'] == 'S':
print("✅ Export completed successfully!")
# Download file
print("\n📥 Downloading export file...")
filepath = self.download_file(task_id, output_dir)
# Verify file
if os.path.exists(filepath):
file_size = os.path.getsize(filepath)
print(f"💾 File saved: {filepath}")
print(f"📊 Size: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)")
return {
'success': True,
'task_id': task_id,
'filepath': filepath,
'size': file_size
}
else:
print("⌠File download verification failed")
elif status['status'] == 'E':
print(f"⌠Export failed: {status.get('message', 'Unknown error')}")
return {
'success': False,
'task_id': task_id,
'error': status.get('message')
}
time.sleep(10) # Wait before next check
print(f"â±ï¸ Export timed out after {timeout_minutes} minutes")
return {
'success': False,
'task_id': task_id,
'error': 'Timeout'
}
# Usage
manager = ExportManager(
base_url="https://demo.epmwarecloud.com",
token="15388ad5-c9af-4cf3-af47-8021c1ab3fb7"
)
result = manager.execute_export_workflow(
profile_name="PBCS",
output_dir="/exports/daily",
timeout_minutes=30
)
if result['success']:
print(f"\n🎉 Export workflow completed successfully!")
print(f"📠File: {result['filepath']}")
else:
print(f"\n😞 Export workflow failed: {result.get('error')}")
Download Options
Streaming Large Files
def download_large_export(base_url, token, task_id, output_path, chunk_size=1024*1024):
"""
Download large export files efficiently
Args:
chunk_size: Download chunk size (default 1MB)
"""
url = f"{base_url}/service/api/export/download_file/{task_id}"
headers = {'Authorization': f'Token {token}'}
with requests.get(url, headers=headers, stream=True) as response:
response.raise_for_status()
total_size = int(response.headers.get('Content-Length', 0))
with open(output_path, 'wb') as file:
downloaded = 0
start_time = time.time()
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
file.write(chunk)
downloaded += len(chunk)
# Calculate download speed
elapsed = time.time() - start_time
if elapsed > 0:
speed = downloaded / elapsed / 1024 / 1024 # MB/s
if total_size > 0:
percent = (downloaded / total_size) * 100
remaining = total_size - downloaded
eta = remaining / (downloaded / elapsed) if downloaded > 0 else 0
print(f"Progress: {percent:.1f}% | "
f"Speed: {speed:.2f} MB/s | "
f"ETA: {eta:.0f}s", end='\r')
print(f"\n✅ Download complete: {output_path}")
Resume Interrupted Downloads
def download_with_resume(base_url, token, task_id, output_path):
"""Download with resume capability"""
url = f"{base_url}/service/api/export/download_file/{task_id}"
headers = {'Authorization': f'Token {token}'}
# Check if partial file exists
if os.path.exists(output_path):
resume_pos = os.path.getsize(output_path)
headers['Range'] = f'bytes={resume_pos}-'
mode = 'ab'
print(f"📂 Resuming download from byte {resume_pos}")
else:
resume_pos = 0
mode = 'wb'
response = requests.get(url, headers=headers, stream=True)
# Check if server supports resume
if response.status_code == 206: # Partial Content
print("✅ Server supports resume")
elif response.status_code == 200:
if resume_pos > 0:
print("âš ï¸ Server doesn't support resume, restarting download")
mode = 'wb'
else:
response.raise_for_status()
with open(output_path, mode) as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
print(f"✅ Download complete: {output_path}")
File Processing
Process Downloaded CSV
import pandas as pd
import csv
def process_export_csv(filepath):
"""Process downloaded CSV file"""
print(f"📊 Processing export file: {filepath}")
# Quick analysis with pandas
try:
df = pd.read_csv(filepath)
print(f"\n📈 File Statistics:")
print(f" Rows: {len(df):,}")
print(f" Columns: {len(df.columns)}")
print(f" Memory usage: {df.memory_usage().sum() / 1024 / 1024:.2f} MB")
print(f"\n📋 Columns:")
for col in df.columns:
print(f" - {col} ({df[col].dtype})")
# Sample data
print(f"\n👀 Sample data (first 5 rows):")
print(df.head())
return df
except Exception as e:
print(f"⌠Error processing CSV: {e}")
return None
# Usage
df = process_export_csv("/exports/export_244596.csv")
Archive Exports
import zipfile
from datetime import datetime, timedelta
def archive_old_exports(export_dir, archive_dir, days_to_keep=7):
"""Archive exports older than specified days"""
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
archived_count = 0
# Create archive directory
os.makedirs(archive_dir, exist_ok=True)
# Create monthly archive
archive_name = f"exports_{datetime.now():%Y_%m}.zip"
archive_path = os.path.join(archive_dir, archive_name)
with zipfile.ZipFile(archive_path, 'a', zipfile.ZIP_DEFLATED) as zipf:
for filename in os.listdir(export_dir):
filepath = os.path.join(export_dir, filename)
# Check file age
file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
if file_time < cutoff_date:
# Add to archive
zipf.write(filepath, filename)
# Remove original
os.remove(filepath)
archived_count += 1
print(f"📦 Archived: {filename}")
print(f"\n✅ Archived {archived_count} files to {archive_path}")
return archived_count
# Usage
archive_old_exports(
export_dir="/exports/daily",
archive_dir="/exports/archive",
days_to_keep=7
)
Error Handling
Common Errors
| Error | Cause | Solution |
|---|---|---|
404 Not Found |
Invalid task ID or file not ready | Verify task ID and completion status |
401 Unauthorized |
Invalid token | Check authentication |
410 Gone |
Export file expired/deleted | Re-run export |
500 Internal Server Error |
Server issue | Retry or contact support |
Error Recovery
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def download_with_retry(base_url, token, task_id, output_path):
"""Download with automatic retry on failure"""
try:
return download_export_file(base_url, token, task_id, output_path)
except requests.exceptions.RequestException as e:
print(f"âš ï¸ Download attempt failed: {e}")
raise
Best Practices
1. Validate Before Download
def validate_before_download(task_id):
"""Validate task is ready for download"""
# Check task status
status = check_task_status(task_id)
if status['status'] != 'S':
raise ValueError(f"Task {task_id} not successful: {status['status']}")
if status['percentCompleted'] != '100':
raise ValueError(f"Task {task_id} not complete: {status['percentCompleted']}%")
return True
2. Implement Download Queue
import queue
import threading
class DownloadQueue:
"""Queue multiple downloads"""
def __init__(self, base_url, token, max_concurrent=3):
self.base_url = base_url
self.token = token
self.queue = queue.Queue()
self.max_concurrent = max_concurrent
def add_download(self, task_id, output_dir):
"""Add download to queue"""
self.queue.put((task_id, output_dir))
def process_downloads(self):
"""Process download queue"""
threads = []
for _ in range(self.max_concurrent):
t = threading.Thread(target=self._worker)
t.daemon = True
t.start()
threads.append(t)
# Wait for completion
self.queue.join()
def _worker(self):
"""Worker thread for downloads"""
while True:
task_id, output_dir = self.queue.get()
try:
filepath = download_export_file(
self.base_url, self.token, task_id, output_dir
)
print(f"✅ Downloaded: {filepath}")
except Exception as e:
print(f"⌠Failed to download {task_id}: {e}")
finally:
self.queue.task_done()
3. Verify File Integrity
import hashlib
def verify_download_integrity(filepath, expected_hash=None):
"""Verify downloaded file integrity"""
# Calculate file hash
sha256_hash = hashlib.sha256()
with open(filepath, 'rb') as file:
for chunk in iter(lambda: file.read(4096), b""):
sha256_hash.update(chunk)
file_hash = sha256_hash.hexdigest()
if expected_hash:
if file_hash == expected_hash:
print(f"✅ File integrity verified")
return True
else:
print(f"⌠File integrity check failed")
return False
else:
print(f"📋 File SHA256: {file_hash}")
return file_hash
Related Operations
- Run Export - Start export execution
- Get Export Details - Get export history
- Get Task Status - Monitor export progress
Next Steps
After downloading export files: 1. Validate file content and format 2. Process data as needed 3. Archive or move to permanent storage 4. Clean up temporary files