Skip to content

Download Export File

The Download Export File API allows you to retrieve the exported data file after a successful export execution. This endpoint downloads the file generated by the export task.

Overview

After an export task completes successfully, use this endpoint to download the generated file. The file format depends on the export configuration (CSV, TXT, XML, etc.).

Endpoint Details

URL Structure

GET /service/api/export/download_file/{TASK_ID}

Full URL Example

https://demo.epmwarecloud.com/service/api/export/download_file/244596

Method

GET

Parameters

Parameter Type Required Description
TASK_ID Path Yes Task ID from export execution

Request Examples

Using curl

# Download to current directory
curl GET 'https://demo.epmwarecloud.com/service/api/export/download_file/244596' \
  -H 'Authorization: Token 15388ad5-c9af-4cf3-af47-8021c1ab3fb7' \
  -o export_244596.txt

# Download with original filename
curl -OJ 'https://demo.epmwarecloud.com/service/api/export/download_file/244596' \
  -H 'Authorization: Token 15388ad5-c9af-4cf3-af47-8021c1ab3fb7'

Using Python

import requests
import os
from urllib.parse import urlparse
import re

def download_export_file(base_url, token, task_id, output_dir=None):
    """
    Download export file

    Args:
        base_url: EPMware base URL
        token: Authentication token
        task_id: Task ID from export execution
        output_dir: Directory to save file (optional)

    Returns:
        str: Path to downloaded file
    """
    url = f"{base_url}/service/api/export/download_file/{task_id}"

    headers = {
        'Authorization': f'Token {token}'
    }

    # Stream download for large files
    response = requests.get(url, headers=headers, stream=True)

    if response.status_code != 200:
        raise Exception(f"Download failed: {response.status_code} - {response.text}")

    # Determine filename
    filename = f"export_{task_id}.txt"  # Default

    # Try to get filename from Content-Disposition header
    content_disposition = response.headers.get('Content-Disposition')
    if content_disposition:
        match = re.search(r'filename="?([^";\n]+)"?', content_disposition)
        if match:
            filename = match.group(1)

    # Determine output path
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)
        filepath = os.path.join(output_dir, filename)
    else:
        filepath = filename

    # Download file
    total_size = int(response.headers.get('Content-Length', 0))
    downloaded = 0

    with open(filepath, 'wb') as file:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                file.write(chunk)
                downloaded += len(chunk)

                # Progress indicator
                if total_size > 0:
                    percent = (downloaded / total_size) * 100
                    print(f"Downloading: {percent:.1f}%", end='\r')

    print(f"\n✅ Downloaded: {filepath}")

    # Get file size
    file_size = os.path.getsize(filepath)
    print(f"📊 File size: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)")

    return filepath

# Usage
file_path = download_export_file(
    base_url="https://demo.epmwarecloud.com",
    token="15388ad5-c9af-4cf3-af47-8021c1ab3fb7",
    task_id="244596",
    output_dir="/exports/daily/"
)

Using PowerShell

function Download-EPMwareExportFile {
    param(
        [Parameter(Mandatory=$true)]
        [string]$BaseUrl,

        [Parameter(Mandatory=$true)]
        [string]$Token,

        [Parameter(Mandatory=$true)]
        [string]$TaskId,

        [string]$OutputPath = "."
    )

    $url = "$BaseUrl/service/api/export/download_file/$TaskId"

    $headers = @{
        "Authorization" = "Token $Token"
    }

    # Ensure output directory exists
    if (!(Test-Path $OutputPath)) {
        New-Item -ItemType Directory -Path $OutputPath -Force | Out-Null
    }

    $outputFile = Join-Path $OutputPath "export_$TaskId.txt"

    try {
        # Download file
        Write-Host "📥 Downloading export file..." -ForegroundColor Cyan

        $progressPreference = 'Continue'
        Invoke-WebRequest `
            -Uri $url `
            -Headers $headers `
            -OutFile $outputFile `
            -UseBasicParsing

        # Get file info
        $fileInfo = Get-Item $outputFile
        $sizeMB = [math]::Round($fileInfo.Length / 1MB, 2)

        Write-Host "✅ File downloaded successfully!" -ForegroundColor Green
        Write-Host "📁 Location: $outputFile"
        Write-Host "📊 Size: $sizeMB MB"

        return $outputFile
    }
    catch {
        Write-Host "❌ Download failed: $_" -ForegroundColor Red
        throw
    }
}

# Usage
$file = Download-EPMwareExportFile `
    -BaseUrl "https://demo.epmwarecloud.com" `
    -Token "15388ad5-c9af-4cf3-af47-8021c1ab3fb7" `
    -TaskId "244596" `
    -OutputPath "C:\Exports"

Response Format

Headers

Header Description Example
Content-Type File MIME type text/csv, application/octet-stream
Content-Length File size in bytes 1524367
Content-Disposition Filename information attachment; filename="PBCS_export_20250115.csv"

Body

The response body contains the raw file data. The format depends on the export configuration: - CSV files for tabular data - TXT files for delimited data - XML files for structured data - ZIP files for compressed exports

Complete Export Workflow

import time
from datetime import datetime

class ExportManager:
    """Manage complete export workflow"""

    def __init__(self, base_url, token):
        self.base_url = base_url
        self.token = token
        self.headers = {
            'Authorization': f'Token {token}',
            'Content-Type': 'application/json'
        }

    def run_export(self, profile_name):
        """Start export execution"""

        url = f"{self.base_url}/service/api/export/run"
        data = {'name': profile_name}

        response = requests.post(url, headers=self.headers, json=data)
        response.raise_for_status()

        return response.json()['taskId']

    def check_status(self, task_id):
        """Check export task status"""

        url = f"{self.base_url}/service/api/task/get_status/{task_id}"

        response = requests.get(url, headers={'Authorization': f'Token {self.token}'})
        response.raise_for_status()

        return response.json()

    def download_file(self, task_id, output_dir):
        """Download export file"""

        url = f"{self.base_url}/service/api/export/download_file/{task_id}"

        response = requests.get(url, headers={'Authorization': f'Token {self.token}'}, 
                              stream=True)
        response.raise_for_status()

        # Generate filename with timestamp
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f"export_{task_id}_{timestamp}.txt"
        filepath = os.path.join(output_dir, filename)

        # Download with progress
        total_size = int(response.headers.get('Content-Length', 0))

        with open(filepath, 'wb') as file:
            downloaded = 0
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
                downloaded += len(chunk)

                if total_size > 0:
                    progress = (downloaded / total_size) * 100
                    bars = 'â–ˆ' * int(progress / 5) + 'â–‘' * (20 - int(progress / 5))
                    print(f'\r[{bars}] {progress:.1f}%', end='', flush=True)

        print()  # New line after progress bar
        return filepath

    def execute_export_workflow(self, profile_name, output_dir, timeout_minutes=30):
        """Execute complete export workflow"""

        print(f"🚀 Starting export: {profile_name}")
        print("=" * 50)

        # Start export
        task_id = self.run_export(profile_name)
        print(f"📋 Task ID: {task_id}")

        # Monitor progress
        start_time = time.time()
        timeout_seconds = timeout_minutes * 60

        while time.time() - start_time < timeout_seconds:
            status = self.check_status(task_id)

            print(f"⏳ Status: {status['status']} - Progress: {status['percentCompleted']}%")

            if status['status'] == 'S':
                print("✅ Export completed successfully!")

                # Download file
                print("\n📥 Downloading export file...")
                filepath = self.download_file(task_id, output_dir)

                # Verify file
                if os.path.exists(filepath):
                    file_size = os.path.getsize(filepath)
                    print(f"💾 File saved: {filepath}")
                    print(f"📊 Size: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)")

                    return {
                        'success': True,
                        'task_id': task_id,
                        'filepath': filepath,
                        'size': file_size
                    }
                else:
                    print("❌ File download verification failed")

            elif status['status'] == 'E':
                print(f"❌ Export failed: {status.get('message', 'Unknown error')}")
                return {
                    'success': False,
                    'task_id': task_id,
                    'error': status.get('message')
                }

            time.sleep(10)  # Wait before next check

        print(f"⏱️ Export timed out after {timeout_minutes} minutes")
        return {
            'success': False,
            'task_id': task_id,
            'error': 'Timeout'
        }

# Usage
manager = ExportManager(
    base_url="https://demo.epmwarecloud.com",
    token="15388ad5-c9af-4cf3-af47-8021c1ab3fb7"
)

result = manager.execute_export_workflow(
    profile_name="PBCS",
    output_dir="/exports/daily",
    timeout_minutes=30
)

if result['success']:
    print(f"\n🎉 Export workflow completed successfully!")
    print(f"📁 File: {result['filepath']}")
else:
    print(f"\n😞 Export workflow failed: {result.get('error')}")

Download Options

Streaming Large Files

def download_large_export(base_url, token, task_id, output_path, chunk_size=1024*1024):
    """
    Download large export files efficiently

    Args:
        chunk_size: Download chunk size (default 1MB)
    """
    url = f"{base_url}/service/api/export/download_file/{task_id}"
    headers = {'Authorization': f'Token {token}'}

    with requests.get(url, headers=headers, stream=True) as response:
        response.raise_for_status()

        total_size = int(response.headers.get('Content-Length', 0))

        with open(output_path, 'wb') as file:
            downloaded = 0
            start_time = time.time()

            for chunk in response.iter_content(chunk_size=chunk_size):
                if chunk:
                    file.write(chunk)
                    downloaded += len(chunk)

                    # Calculate download speed
                    elapsed = time.time() - start_time
                    if elapsed > 0:
                        speed = downloaded / elapsed / 1024 / 1024  # MB/s

                        if total_size > 0:
                            percent = (downloaded / total_size) * 100
                            remaining = total_size - downloaded
                            eta = remaining / (downloaded / elapsed) if downloaded > 0 else 0

                            print(f"Progress: {percent:.1f}% | "
                                  f"Speed: {speed:.2f} MB/s | "
                                  f"ETA: {eta:.0f}s", end='\r')

    print(f"\n✅ Download complete: {output_path}")

Resume Interrupted Downloads

def download_with_resume(base_url, token, task_id, output_path):
    """Download with resume capability"""

    url = f"{base_url}/service/api/export/download_file/{task_id}"
    headers = {'Authorization': f'Token {token}'}

    # Check if partial file exists
    if os.path.exists(output_path):
        resume_pos = os.path.getsize(output_path)
        headers['Range'] = f'bytes={resume_pos}-'
        mode = 'ab'
        print(f"📂 Resuming download from byte {resume_pos}")
    else:
        resume_pos = 0
        mode = 'wb'

    response = requests.get(url, headers=headers, stream=True)

    # Check if server supports resume
    if response.status_code == 206:  # Partial Content
        print("✅ Server supports resume")
    elif response.status_code == 200:
        if resume_pos > 0:
            print("⚠️ Server doesn't support resume, restarting download")
            mode = 'wb'
    else:
        response.raise_for_status()

    with open(output_path, mode) as file:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                file.write(chunk)

    print(f"✅ Download complete: {output_path}")

File Processing

Process Downloaded CSV

import pandas as pd
import csv

def process_export_csv(filepath):
    """Process downloaded CSV file"""

    print(f"📊 Processing export file: {filepath}")

    # Quick analysis with pandas
    try:
        df = pd.read_csv(filepath)

        print(f"\n📈 File Statistics:")
        print(f"  Rows: {len(df):,}")
        print(f"  Columns: {len(df.columns)}")
        print(f"  Memory usage: {df.memory_usage().sum() / 1024 / 1024:.2f} MB")

        print(f"\n📋 Columns:")
        for col in df.columns:
            print(f"  - {col} ({df[col].dtype})")

        # Sample data
        print(f"\n👀 Sample data (first 5 rows):")
        print(df.head())

        return df

    except Exception as e:
        print(f"❌ Error processing CSV: {e}")
        return None

# Usage
df = process_export_csv("/exports/export_244596.csv")

Archive Exports

import zipfile
from datetime import datetime, timedelta

def archive_old_exports(export_dir, archive_dir, days_to_keep=7):
    """Archive exports older than specified days"""

    cutoff_date = datetime.now() - timedelta(days=days_to_keep)
    archived_count = 0

    # Create archive directory
    os.makedirs(archive_dir, exist_ok=True)

    # Create monthly archive
    archive_name = f"exports_{datetime.now():%Y_%m}.zip"
    archive_path = os.path.join(archive_dir, archive_name)

    with zipfile.ZipFile(archive_path, 'a', zipfile.ZIP_DEFLATED) as zipf:
        for filename in os.listdir(export_dir):
            filepath = os.path.join(export_dir, filename)

            # Check file age
            file_time = datetime.fromtimestamp(os.path.getmtime(filepath))

            if file_time < cutoff_date:
                # Add to archive
                zipf.write(filepath, filename)

                # Remove original
                os.remove(filepath)
                archived_count += 1

                print(f"📦 Archived: {filename}")

    print(f"\n✅ Archived {archived_count} files to {archive_path}")
    return archived_count

# Usage
archive_old_exports(
    export_dir="/exports/daily",
    archive_dir="/exports/archive",
    days_to_keep=7
)

Error Handling

Common Errors

Error Cause Solution
404 Not Found Invalid task ID or file not ready Verify task ID and completion status
401 Unauthorized Invalid token Check authentication
410 Gone Export file expired/deleted Re-run export
500 Internal Server Error Server issue Retry or contact support

Error Recovery

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def download_with_retry(base_url, token, task_id, output_path):
    """Download with automatic retry on failure"""

    try:
        return download_export_file(base_url, token, task_id, output_path)
    except requests.exceptions.RequestException as e:
        print(f"⚠️ Download attempt failed: {e}")
        raise

Best Practices

1. Validate Before Download

def validate_before_download(task_id):
    """Validate task is ready for download"""

    # Check task status
    status = check_task_status(task_id)

    if status['status'] != 'S':
        raise ValueError(f"Task {task_id} not successful: {status['status']}")

    if status['percentCompleted'] != '100':
        raise ValueError(f"Task {task_id} not complete: {status['percentCompleted']}%")

    return True

2. Implement Download Queue

import queue
import threading

class DownloadQueue:
    """Queue multiple downloads"""

    def __init__(self, base_url, token, max_concurrent=3):
        self.base_url = base_url
        self.token = token
        self.queue = queue.Queue()
        self.max_concurrent = max_concurrent

    def add_download(self, task_id, output_dir):
        """Add download to queue"""
        self.queue.put((task_id, output_dir))

    def process_downloads(self):
        """Process download queue"""

        threads = []
        for _ in range(self.max_concurrent):
            t = threading.Thread(target=self._worker)
            t.daemon = True
            t.start()
            threads.append(t)

        # Wait for completion
        self.queue.join()

    def _worker(self):
        """Worker thread for downloads"""
        while True:
            task_id, output_dir = self.queue.get()

            try:
                filepath = download_export_file(
                    self.base_url, self.token, task_id, output_dir
                )
                print(f"✅ Downloaded: {filepath}")
            except Exception as e:
                print(f"❌ Failed to download {task_id}: {e}")
            finally:
                self.queue.task_done()

3. Verify File Integrity

import hashlib

def verify_download_integrity(filepath, expected_hash=None):
    """Verify downloaded file integrity"""

    # Calculate file hash
    sha256_hash = hashlib.sha256()

    with open(filepath, 'rb') as file:
        for chunk in iter(lambda: file.read(4096), b""):
            sha256_hash.update(chunk)

    file_hash = sha256_hash.hexdigest()

    if expected_hash:
        if file_hash == expected_hash:
            print(f"✅ File integrity verified")
            return True
        else:
            print(f"❌ File integrity check failed")
            return False
    else:
        print(f"📋 File SHA256: {file_hash}")
        return file_hash

Next Steps

After downloading export files: 1. Validate file content and format 2. Process data as needed 3. Archive or move to permanent storage 4. Clean up temporary files