From 3c758a12d0cd3b18c39fc9fa71b5a10d6cc2444d Mon Sep 17 00:00:00 2001 From: "Chanyub.Park" Date: Wed, 7 Aug 2024 03:22:51 +0900 Subject: [PATCH] add metadata blob_storage_total_files and blob_storage_file_index on azure blob storage input --- internal/impl/azure/input_blob_storage.go | 33 +++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/internal/impl/azure/input_blob_storage.go b/internal/impl/azure/input_blob_storage.go index 93ea8f9e9..130fb73fc 100644 --- a/internal/impl/azure/input_blob_storage.go +++ b/internal/impl/azure/input_blob_storage.go @@ -391,6 +391,9 @@ type azureBlobStorage struct { object *azurePendingObject log *service.Logger + + totalFiles int + currentIndex int } func newAzureBlobStorage(conf bsiConfig, log *service.Logger) (*azureBlobStorage, error) { @@ -398,6 +401,9 @@ func newAzureBlobStorage(conf bsiConfig, log *service.Logger) (*azureBlobStorage conf: conf, objectScannerCtor: conf.Codec, log: log, + + totalFiles: 0, + currentIndex: 0, } return a, nil } @@ -405,7 +411,23 @@ func newAzureBlobStorage(conf bsiConfig, log *service.Logger) (*azureBlobStorage func (a *azureBlobStorage) Connect(ctx context.Context) error { var err error a.keyReader, err = newAzureTargetReader(ctx, a.log, a.conf) + + // Count total files + for { + _, err := a.keyReader.Pop(ctx) + if err == io.EOF { + break + } + if err != nil { + return err + } + a.totalFiles++ + } + + // Reset the keyReader + a.keyReader, err = newAzureTargetReader(ctx, a.log, a.conf) return err + } func (a *azureBlobStorage) getObjectTarget(ctx context.Context) (*azurePendingObject, error) { @@ -438,7 +460,7 @@ func (a *azureBlobStorage) getObjectTarget(ctx context.Context) (*azurePendingOb return object, nil } -func blobStorageMetaToBatch(p *azurePendingObject, containerName string, parts service.MessageBatch) { +func blobStorageMetaToBatch(a *azureBlobStorage, p *azurePendingObject, containerName string, parts service.MessageBatch) { for _, part := range parts { part.MetaSetMut("blob_storage_key", p.target.key) part.MetaSetMut("blob_storage_container", containerName) @@ -456,6 +478,10 @@ func blobStorageMetaToBatch(p *azurePendingObject, containerName string, parts s for k, v := range p.obj.Metadata { part.MetaSetMut(k, v) } + + // Add total files count and current file index + part.MetaSetMut("blob_storage_total_files", a.totalFiles) + part.MetaSetMut("blob_storage_file_index", a.currentIndex) } } @@ -500,7 +526,10 @@ func (a *azureBlobStorage) ReadBatch(ctx context.Context) (msg service.MessageBa } } - blobStorageMetaToBatch(object, a.conf.Container, parts) + blobStorageMetaToBatch(a, object, a.conf.Container, parts) + + // Increment the current index after processing a file + a.currentIndex++ return parts, func(rctx context.Context, res error) error { return scnAckFn(rctx, res)