Azure Functions Isolated でのシングルトン

.NET用のAzure Functionsは元々ホストが実行されるプロセスで関数を実行していたが、ホストとワーカーが別プロセスで動作するIsolatedタイプが今後主流になるということで、仕事で新しく作るFunctionsはIsolatedでやっている。

ただまだまだIsolatedが発展途上なのか困りごとにもしばしば直面する。

よく書く処理で一番こたえがたいのがSingletonがサポートされていないことだ。

Singletonは便利なやつで、どんなにスケールアウトされていても、並列実行数を限定することができるし、同一IDは直列に処理したい時に便利だ。

 a)

 +-----------------+  +-----------------+  +-----------------+
 |ORDER ID: 100    |  |ORDER ID: 100    |  |ORDER ID: 101    |
 |TIMESTAMP: 3s ago|  |TIMESTAMP: 2s ago|  |TIMESTAMP: 1s ago|
 +-------+---------+  +-------+---------+  +--------+--------+
         |                    |                     |
 +-------v--------------------v---------------------v--------+
 |                                                           |
 |                      Azure Functions                      |
 |                                                           |
 +-----------------------------------------------------------+

 b)

 +-----------------+
 |ORDER ID: 100    |
 |TIMESTAMP: 2s ago|
 +-------+---------+
         |
 +-------v---------+                       +-----------------+
 |ORDER ID: 100    |                       |ORDER ID: 101    |
 |TIMESTAMP: 3s ago|                       |TIMESTAMP: 1s ago|
 +-------+---------+                       +--------+--------+
         |                                          |
 +-------v------------------------------------------v--------+
 |                                                           |
 |                      Azure Functions                      |
 |                                                           |
 +-----------------------------------------------------------+

a) はIsolatedで発生してしまう動き、 b)はもともとホスト実行型のAzure Functionsでできてた便利な処理順制御。

やっぱりb)を使いたい。というより重複実行が問題になる処理をFunctionsに載せたいからb)じゃないと困る。

ないなら書くしかなかろうも。少々汚いけども。

public class LockService
{
    const string CONNECTION_STRING_NAME = "AzureWebJobsStorage";
    const string CONTAINER_NAME = "azure-webjobs-hosts";
    private readonly BlobContainerClient _blobContainerClient;
    public LockService(IConfiguration configuration)
    {
        var serviceClient = new BlobServiceClient(configuration[CONNECTION_STRING_NAME]);
        _blobContainerClient = serviceClient.GetBlobContainerClient(CONTAINER_NAME);

        // azure-webjobs-hostsコンテナがないなんてことはないはず
        // _blobContainerClient.CreateIfNotExists(); 
    }

    public async ValueTask<Lock> Lock(string filename, CancellationToken cancellationToken)
    {
        var client = _blobContainerClient.GetBlockBlobClient(filename);
        if (!await client.ExistsAsync().ConfigureAwait(false))
        {
            try
            {
                await client.UploadAsync(new MemoryStream(Array.Empty<byte>())).ConfigureAwait(false);
            }
            catch (Azure.RequestFailedException ex)
            {
                // 並列で実行されている時は発生し得る
                // Leaseがとられてる場合は412が返るのでそれは無視。
                if (ex.Status != 412)
                    throw;
            }
        }
        var @lock = new Lock(client.GetBlobLeaseClient());
        await @lock.StartAsync(cancellationToken).ConfigureAwait(false);
        return @lock;
    }
}

public class Lock : IAsyncDisposable
{
    private readonly BlobLeaseClient _blobLeaseClient;
    private CancellationTokenSource? _cancellationTokenSource = null;
    private Task? _task = null;

    public Lock(BlobLeaseClient blobLeaseClient)
    {
        _blobLeaseClient = blobLeaseClient;

    }
    public async ValueTask StartAsync(CancellationToken cancellationToken)
    {
        if (_task != null)
        {
            throw new InvalidOperationException("Already Locked!");
        }
        while (!cancellationToken.IsCancellationRequested)
        {
            Azure.Response<BlobLease>? response = null;
            try
            {
                // リース時間は60秒がマックスらしい
                response = await _blobLeaseClient.AcquireAsync(TimeSpan.FromSeconds(60), cancellationToken: cancellationToken).ConfigureAwait(false);
            }
            catch (Azure.RequestFailedException ex)
            {
                // 既に誰かがリース済みなら409が返ってくる。
                // 409ならリースが解放されるまで待ちたいのでthrowしない
                if (ex.Status != 409)
                    throw;
            }
            if (response != null && response.Value != null && !string.IsNullOrEmpty(response.Value.LeaseId))
            {
                _cancellationTokenSource = new CancellationTokenSource();
                var cts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.Token, cancellationToken);
                // 処理が60秒で終わらない可能性があるので、バックグラウンドでリースを更新し続ける。
                _task = Task.Run(async () => {
                    var renewed = true;
                    while (!cts.Token.IsCancellationRequested)
                    {
                        await Task.Delay(TimeSpan.FromSeconds(renewed ? 30 : 1), cts.Token).ConfigureAwait(false);
                        try
                        {
                            await _blobLeaseClient.RenewAsync(cancellationToken: cts.Token).ConfigureAwait(false);
                            renewed = true;
                        }
                        catch
                        {
                            renewed = false;
                        }
                    }
                }, cts.Token);
                return;  
            }
            else
            {
                // 誰かがリース済みなら、10秒待ち
                await Task.Delay(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
            }
        }
    }
    public async ValueTask FinishAsync(CancellationToken cancellationToken)
    {
        if (_task != null && _cancellationTokenSource != null)
        {
                        _task = null;
            _cancellationTokenSource.Cancel();
                        await _blobLeaseClient.ReleaseAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
        }

    }
    public async ValueTask DisposeAsync()
    {
        await FinishAsync(CancellationToken.None).ConfigureAwait(false);
    }
}

使い方

await using var @lock = await _lockService.Lock($"path/to/the/{id}.lock", CancellationToken.None);
// 以降はロックが取れてる状態

このセルフロックの問題点は、BlobInputとかのInputバインドが使えなくなること。
ロックが取れた後に自分でブロブやらテーブルから取得しないと、古いデータで処理することになってしまう。
だからほんとは関数実行前にホスト側でやってほしい。

注意点
ロックによって確保されるのは、同じリースIDが同時実行されないということだけなので、実行される順序自体は不定。
だからステートを内包しているキューでそのステートに順序性があるものには使用できない。 まあそれは従来のFunctionでも一緒のはず。


You'll only receive email when they publish something new.

More from iwate
All posts