Direct-to-S3 Node.js サンプル

Logo mashup showing Autodesk Forge, Node.js, and AWS S3 storage integration.

Data Management OSS (Object Storage Service) の Direct-to-S3 アプローチへの移行についてのアナウンスがありましたので、この移行をよりスムーズにおこなっていただくための情報をご提供したいと思います。今回は、Autodesk Forge サービスにおける新しいバイナリ転送のための Node.js ユーティリティについてです。 これらのサンプルは、LTS バージョンの Node.js を使用してビルドされています。

チームはまた、Direct-to-S3 アプローチを使用する新しいSDKの開発にも取り組んでいます。 

チームの Petr Broz は、OSS Direct-to-S3 アプローチのために新しくリリースされたすべてのエンドポイントを含む、キュレーションされたユーティリティファイルに取り組みました。 

Github のリポジトリはこちら、その中の Node.js ブランチはこちらで利用可能です。 

Index.js (ユーティリテイ ファイル)

const axios = require('axios');
const rax = require('retry-axios');
class BinaryTransferClient {
    /**
    * Creates a new instance of the binary transfer helper client.
    *
    * Note that the provided access token will be used for all requests initiated
    * by this client. For long-running operations the token could potentially expire,
    * so consider modifying this class to refresh the token whenever needed.
    *
    * @param {string} token Access token to use when communicating with Autodesk Forge services.
    * @param {string} [host="https://developer.api.autodesk.com"] Optional Autodesk Forge host).
    */
    constructor(token, host) {
        this.token = token;
        this.axios = axios.create({
            baseURL: (host || 'https://developer.api.autodesk.com') + '/oss/v2/'
        });
        // Attach an interceptor to the axios instance that will retry response codes 100-199, 429, and 500-599.
        // For default settings, see https://github.com/JustinBeckwith/retry-axios#usage.
        this.axios.defaults.raxConfig = {
            instance: this.axios
        };
        rax.attach(this.axios);
    }
    /**
    * Generates one or more signed URLs that can be used to upload a file (or its parts) to OSS,
    * and an upload key that is used to generate additional URLs or in {@see _completeUpload}
    * after all the parts have been uploaded successfully.
    *
    * Note that if you are uploading in multiple parts, each part except for the final one
    * must be of size at least 5MB, otherwise the call to {@see _completeUpload} will fail.
    *
    * @async
    * @param {string} bucketKey Bucket key.
    * @param {string} objectKey Object key.
    * @param {number} [parts=1] How many URLs to generate in case of multi-part upload.
    * @param {number} [firstPart=1] Index of the part the first returned URL should point to.
    * For example, to upload parts 10 through 15 of a file, use `firstPart` = 10 and `parts` = 6.
    * @param {string} [uploadKey] Optional upload key if this is a continuation of a previously
    * initiated upload.
    * @param {number} [minutesExpiration] Custom expiration for the upload URLs
    * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
    * @returns {Promise<object>} Signed URLs for uploading chunks of the file to AWS S3,
    * and a unique upload key used to generate additional URLs or to complete the upload.
    */
    async _getUploadUrls(bucketKey, objectKey, parts = 1, firstPart = 1, uploadKey, minutesExpiration) {
        let endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3upload?parts=${parts}&firstPart=${firstPart}`;
        if (uploadKey) {
            endpoint += `&uploadKey=${uploadKey}`;
        }
        if (minutesExpiration) {
            endpoint += `&minutesExpiration=${minutesExpiration}`;
        }
        const headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + this.token
        };
        const resp = await this.axios.get(endpoint, { headers });
        return resp.data;
    }
    /**
    * Finalizes the upload of a file to OSS.
    *
    * @async
    * @param {string} bucketKey Bucket key.
    * @param {string} objectKey Object key.
    * @param {string} uploadKey Upload key returned by {@see _getUploadUrls}.
    * @param {string} [contentType] Optinal content type that should be recorded for the uploaded file.
    * @returns {Promise<object>} Details of the created object in OSS.
    */
    async _completeUpload(bucketKey, objectKey, uploadKey, contentType) {
        const endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3upload`;
        const payload = { uploadKey };
        const headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + this.token
        };
        if (contentType) {
            headers['x-ads-meta-Content-Type'] = contentType;
        }
        const resp = await this.axios.post(endpoint, payload, { headers });
        return resp.data;
    }
    /**
    * Uploads content to a specific bucket object.
    *
    * @async
    * @param {string} bucketKey Bucket key.
    * @param {string} objectKey Name of uploaded object.
    * @param {Buffer} data Object content.
    * @param {object} [options] Additional upload options.
    * @param {string} [options.contentType] Optional content type of the uploaded file.
    * @param {number} [options.minutesExpiration] Custom expiration for the upload URLs
    * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
    * @returns {Promise<object>} Object description containing 'bucketKey', 'objectKey', 'objectId',
    * 'sha1', 'size', 'location', and 'contentType'.
    * @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
    */
    async uploadObject(bucketKey, objectKey, data, options) {
        console.assert(data.byteLength > 0);
        const ChunkSize = 5 << 20;
        const MaxBatches = 25;
        const totalParts = Math.ceil(data.byteLength / ChunkSize);
        let partsUploaded = 0;
        let uploadUrls = [];
        let uploadKey;
        while (partsUploaded < totalParts) {
            const chunk = data.slice(partsUploaded * ChunkSize, Math.min((partsUploaded + 1) * ChunkSize, data.byteLength));
            while (true) {
                console.debug('Uploading part', partsUploaded + 1);
                if (uploadUrls.length === 0) {
                    // Automatically retries 429 and 500-599 responses
                    const uploadParams = await this._getUploadUrls(bucketKey, objectKey, Math.min(totalParts - partsUploaded, MaxBatches), partsUploaded + 1, uploadKey, options?.minutesExpiration);
                    uploadUrls = uploadParams.urls.slice();
                    uploadKey = uploadParams.uploadKey;
                }
                const url = uploadUrls.shift();
                try {
                    await this.axios.put(url, chunk);
                    break;
                } catch (err) {
                    const status = err.response?.status;
                    if (status === 403) {
                        console.debug('Got 403, refreshing upload URLs');
                        uploadUrls = []; // Couldn't this cause an infinite loop? (i.e., could the server keep responding with 403 indefinitely?)
                    } else {
                        throw err;
                    }
                }
            }
            console.debug('Part successfully uploaded', partsUploaded + 1);
            partsUploaded++;
        }
        console.debug('Completing part upload');
        return this._completeUpload(bucketKey, objectKey, uploadKey, options?.contentType);
    }
    /**
    * Uploads content stream to a specific bucket object.
    *
    * @async
    * @param {string} bucketKey Bucket key.
    * @param {string} objectKey Name of uploaded object.
    * @param {AsyncIterable<Buffer>} stream Input stream.
    * @param {object} [options] Additional upload options.
    * @param {string} [options.contentType] Optional content type of the uploaded file.
    * @param {number} [options.minutesExpiration] Custom expiration for the upload URLs
    * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
    * @returns {Promise<object>} Object description containing 'bucketKey', 'objectKey', 'objectId',
    * 'sha1', 'size', 'location', and 'contentType'.
    * @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
    */
    async uploadObjectStream(bucketKey, objectKey, input, options) {
        // Helper async generator making sure that each chunk has at least certain number of bytes
        async function* bufferChunks(input, minChunkSize) {
            let buffer = Buffer.alloc(2 * minChunkSize);
            let bytesRead = 0;
            for await (const chunk of input) {
                chunk.copy(buffer, bytesRead);
                bytesRead += chunk.byteLength;
                if (bytesRead >= minChunkSize) {
                    yield buffer.slice(0, bytesRead);
                    bytesRead = 0;
                }
            }
            if (bytesRead > 0) {
                yield buffer.slice(0, bytesRead);
            }
        }
        const MaxBatches = 25;
        const ChunkSize = 5 << 20;
        let partsUploaded = 0;
        let uploadUrls = [];
        let uploadKey;
        for await (const chunk of bufferChunks(input, ChunkSize)) {
            while (true) {
                console.debug('Uploading part', partsUploaded + 1);
                if (uploadUrls.length === 0) {
                    const uploadParams = await this._getUploadUrls(bucketKey, objectKey, MaxBatches, partsUploaded + 1, uploadKey, options?.minutesExpiration);
                    uploadUrls = uploadParams.urls.slice();
                    uploadKey = uploadParams.uploadKey;
                }
                const url = uploadUrls.shift();
                try {
                    await this.axios.put(url, chunk);
                    break;
                } catch (err) {
                    const status = err.response?.status;
                    if (status === 403) {
                        console.debug('Got 403, refreshing upload URLs');
                        uploadUrls = []; // Couldn't this cause an infinite loop? (i.e., could the server keep responding with 403 indefinitely?
                    } else {
                        throw err;
                    }
                }
            }
            console.debug('Part successfully uploaded', partsUploaded + 1);
            partsUploaded++;
        }
        console.debug('Completing part upload');
        return this._completeUpload(bucketKey, objectKey, uploadKey, options?.contentType);
    }
    /**
    * Generates a signed URL that can be used to download a file from OSS.
    *
    * @async
    * @param {string} bucketKey Bucket key.
    * @param {string} objectKey Object key.
    * @param {number} [minutesExpiration] Custom expiration for the download URLs
    * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
    * @returns {Promise<object>} Download URLs and potentially other helpful information.
    */
    async _getDownloadUrl(bucketKey, objectKey, minutesExpiration) {
        let endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3download`;
        if (minutesExpiration) {
            endpoint += `?minutesExpiration=${minutesExpiration}`;
        }
        const headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + this.token
        };
        const resp = await this.axios.get(endpoint, { headers });
        return resp.data;
    }
    /**
    * Downloads a specific OSS object.
    *
    * @async
    * @param {string} bucketKey Bucket key.
    * @param {string} objectKey Object key.
    * @param {object} [options] Additional download options.
    * @param {number} [options.minutesExpiration] Custom expiration for the download URLs
    * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
    * @returns {Promise<ArrayBuffer>} Object content.
    * @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
    */
    async downloadObject(bucketKey, objectKey, options) {
        console.debug('Retrieving download URL');
        const downloadParams = await this._getDownloadUrl(bucketKey, objectKey, options?.minutesExpiration);
        if (downloadParams.status !== 'complete') {
            throw new Error('File not available for download yet.');
        }
        const resp = await this.axios.get(downloadParams.url, {
            responseType: 'arraybuffer',
            onDownloadProgress: progressEvent => {
                const downloadedBytes = progressEvent.currentTarget.response.length;
                const totalBytes = parseInt(progressEvent.currentTarget.responseHeaders['Content-Length']);
                console.debug('Downloaded', downloadedBytes, 'bytes of', totalBytes);
            }
        });
        return resp.data;
    }
    /**
    * Downloads content stream of a specific bucket object.
    *
    * @async
    * @param {string} bucketKey Bucket key.
    * @param {string} objectKey Object name.
    * @param {object} [options] Additional download options.
    * @param {number} [options.minutesExpiration] Custom expiration for the download URLs
    * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
    * @returns {Promise<ReadableStream>} Object content stream.
    * @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
    */
    async downloadObjectStream(bucketKey, objectKey, options) {
        console.debug('Retrieving download URL');
        const downloadParams = await this._getDownloadUrl(bucketKey, objectKey, options?.minutesExpiration);
        if (downloadParams.status !== 'complete') {
            throw new Error('File not available for download yet.');
        }
        const resp = await this.axios.get(downloadParams.url, {
            responseType: 'stream',
            onDownloadProgress: progressEvent => {
                const downloadedBytes = progressEvent.currentTarget.response.length;
                const totalBytes = parseInt(progressEvent.currentTarget.responseHeaders['Content-Length']);
                console.debug('Downloaded', downloadedBytes, 'bytes of', totalBytes);
            }
        });
        return resp.data;
    }
}
module.exports = {
    BinaryTransferClient
};

署名済み URL(Signed URL)のデフォルトの有効期限は2分です(minutesExpiration パラメータで最大60分まで有効期限を延長することができます)。 

ダウンロード

まず、ダウンロードの手順からご紹介します。AWS S3 から署名済み URL(Signed URL)を使ってファイルを直接ダウンロードするために、2つのステップを踏む必要があります。以下は、その仕組みを説明する擬似コードです。

  1. GET buckets/:bucketKey/objects/:objectName/signeds3download エンドポイントを使ってダウンロード用の URL を生成します。
  2. 新しいURLを使用して、AWS S3 から直接 OSS オブジェクトをダウンロードします。
    • レスポンス コードが 100~199、429、500~599 の場合、ダウンロードの再試行(例えば指数関数的バックオフ)を検討する。

以下は、ストリームのダウンロードをおこなう場合のコードです。

const fs = require('fs');
const { BinaryTransferClient } = require('..');
async function downloadStream(filePath, bucketKey, objectKey, accessToken) {
    const client = new BinaryTransferClient(accessToken);
    const stream = await client.downloadObjectStream(bucketKey, objectKey);
    stream.pipe(fs.createWriteStream(filePath));
}
if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
    process.exit(0);
}
downloadStream(process.argv[2], process.argv[3], process.argv[4], process.argv[5]);

また、オブジェクトをローカルファイルにダウンロードする場合(最初にファイル全体をメモリに受信する)には、次のようなコードを使用します。

const fs = require('fs');
const { BinaryTransferClient } = require('..');
async function downloadBuffer(filePath, bucketKey, objectKey, accessToken) {
    const client = new BinaryTransferClient(accessToken);
    const buffer = await client.downloadObject(bucketKey, objectKey);
    fs.writeFileSync(filePath, buffer);
}
if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
    process.exit(0);
}
downloadBuffer(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
    .then(_ => 'Done!')
    .catch(err => console.error(err));

アップロード

次にアップロードの手順をご紹介します。AWS S3 から署名付き URL(Signed URL)を使って直接ファイルをアップロードするには、3 つのステップを踏む必要があります。以下は、その仕組みを説明する擬似コードです。 

  1. アップロードするファイルのパーツ数を算出
    •  注意:最後の 1 つを除き、アップロードする各パーツは 5 MB 以上であること
  2. GET buckets/:bucketKey/objects/:objectKey/signeds3upload?firstPart=<index of first part>&parts=<number of parts> エンドポイントを使用して特定のパーツのファイルをアップロードするための、最大 25 の URL を生成
    • パーツ番号は 1 から始まるものと仮定   
    • 例えば、10 番パーツから 15 番パーツまでのアップロード用 URL を生成するには、<index of first part> を 10 に、<number of parts> を 6 に設定  
    • このエンドポイントは、後で追加の URL を要求したり、アップロードを確定するために使用する uploadKey も返す  
  3. 残りのパーツ ファイルを、対応するアップロード URL にアップロード
    • レスポンスコードが 100~199、429、500~599 の場合、個々のアップロードの再試行を検討する(例えば指数関数的バックオフを使用)
    • レスポンスコードが 403 の場合、アップロード用 URL の有効期限が切れているため、上記手順 2. へ戻る  
    • アップロード用 URL をすべて使い切ってしまい、まだアップロードする必要があるパーツが存在する場合、手順 2. に戻って URL を生成する  
  4. POST buckets/:bucketKey/objects/:objectKey/signeds3upload エンドポイントを使用して、ステップ 2. からの uploadKey 値を使用してアップロードを確定させる

下記は、ローカルファイルを OSS Bucket に(ストリームとして)アップロードする場合のコードです。

const fs = require('fs');
const { BinaryTransferClient } = require('..');
async function uploadStream(filePath, bucketKey, objectKey, accessToken) {
    const client = new BinaryTransferClient(accessToken);
    const stream = fs.createReadStream(filePath);
    const object = await client.uploadObjectStream(bucketKey, objectKey, stream);
    return object;
}
if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
    process.exit(0);
}
uploadStream(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
    .then(obj => console.log(obj))
    .catch(err => console.error(err));

また、ローカルファイルを OSS Bucket にアップロードしたい場合(最初にファイル全体をメモリに読み込む)には、次のようにします。

const fs = require('fs');
const { BinaryTransferClient } = require('..');
async function uploadBuffer(filePath, bucketKey, objectKey, accessToken) {
const client = new BinaryTransferClient(accessToken);
const buffer = fs.readFileSync(filePath);
const object = await client.uploadObject(bucketKey, objectKey, buffer);
return object;
}
if (process.argv.length < 6) {
console.log('Usage:');
console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
process.exit(0);
}
uploadBuffer(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
.then(obj => console.log(obj))
.catch(err => console.error(err));

また、Data Management API で Hub(BIM 360、Fusion Teams、ACC など)にローカルファイルをアップロードする方法も忘れてはいけません。

const fs = require('fs');
const path = require('path');
const { ProjectsApi, FoldersApi, ItemsApi, VersionsApi } = require('forge-apis');
const { BinaryTransferClient } = require('..');
async function getFolderContents(projectId, folderId, getAccessToken) {
    const resp = await new FoldersApi().getFolderContents(projectId, folderId, {}, null, getAccessToken());
    return resp.body.data;
}
async function createStorage(projectId, folderId, displayName, getAccessToken) {
    const body = {
        jsonapi: {
            version: '1.0'
        },
        data: {
            type: 'objects',
            attributes: {
                name: displayName
            },
            relationships: {
                target: {
                    data: {
                        type: 'folders',
                        id: folderId
                    }
                }
            }
        }
    };
    const resp = await new ProjectsApi().postStorage(projectId, body, null, getAccessToken());
    return resp.body.data;
}
async function createItem(projectId, folderId, objectId, displayName, getAccessToken) {
    const body = {
        jsonapi: {
            version: '1.0'
        },
        data: {
            type: 'items',
            attributes: {
                displayName,
                extension: {
                    type: 'items:autodesk.core:File',
                    version: '1.0'
                }
            },
            relationships: {
                tip: {
                    data: {
                        type: 'versions',
                        id: '1'
                    }
                },
                parent: {
                    data: {
                        type: 'folders',
                        id: folderId
                    }
                }
            }
        },
        included: [
            {
                type: 'versions',
                id: '1',
                attributes: {
                    name: displayName,
                    extension: {
                        type: 'versions:autodesk.core:File',
                        version: '1.0'
                    }
                },
                relationships: {
                    storage: {
                        data: {
                            type: 'objects',
                            id: objectId
                        }
                    }
                }
            }
        ]
    };
    const resp = await new ItemsApi().postItem(projectId, body, null, getAccessToken());
    return resp.body.data;
}
async function createVersion(projectId, lineageId, objectId, displayName, getAccessToken) {
    const body = {
        jsonapi: {
            version: '1.0'
        },
        data: {
            type: 'versions',
            attributes: {
                name: displayName,
                extension: {
                    type: 'versions:autodesk.core:File',
                    version: '1.0'
                }
            },
            relationships: {
                item: {
                    data: {
                        type: 'items',
                        id: lineageId
                    }
                },
                storage: {
                    data: {
                        type: 'objects',
                        id: objectId
                    }
                }
            }
        }
    };
    const resp = await new VersionsApi().postVersion(projectId, body, null, getAccessToken());
    return resp.body.data;
}
async function upload(filePath, projectId, folderId, accessToken) {
    const displayName = path.basename(filePath);
    const getAccessToken = () => {
        return { access_token: accessToken };
    };
    console.log('Creating storage...');
    const storage = await createStorage(projectId, folderId, displayName, getAccessToken);
    console.log(storage);
    const match = /urn:adsk.objects:os.object:([^\/]+)\/(.+)/.exec(storage.id);
    if (!match || match.length < 3) {
        throw new Error('Unexpected storage ID', storage.id);
    }
    const bucketKey = match[1];
    const objectKey = match[2];
    console.log('Uploading file...');
    const client = new BinaryTransferClient(accessToken);
    const object = await client.uploadObject(bucketKey, objectKey, fs.readFileSync(filePath));
    console.log(object);
    console.log('Checking if file already exists...');
    const contents = await getFolderContents(projectId, folderId, getAccessToken);
    const item = contents.find(e => e.type === 'items' && e.attributes.displayName === displayName);
    if (!item) {
        console.log('Creating new item...');
        const lineage = await createItem(projectId, folderId, object.objectId, displayName, getAccessToken);
        console.log(lineage);
    } else {
        console.log('Creating new item version...');
        const version = await createVersion(projectId, item.id, object.objectId, displayName, getAccessToken);
        console.log(version);
    }
}
if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <project id> <folder id> <access token>');
    process.exit(0);
}
upload(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
    .then(obj => console.log('Done!'))
    .catch(err => console.error(err));

ご不明な点等ございましたら、forge.help@autodesk.com.までお問い合わせください。 

※ 本記事は Direct-to-S3 Node.js samples | Autodesk Forge から転写・翻訳して一部加筆したものです。

By Toshiaki Isezaki

Discover more from Autodesk Developer Blog

Subscribe now to keep reading and get access to the full archive.

Continue reading