.NET用のAzure Functionsは元々ホストが実行されるプロセスで関数を実行していたが、ホストとワーカーが別プロセスで動作するIsolatedタイプが今後主流になるということで、仕事で新しく作るFunctionsはIsolatedでやっている。
ただまだまだIsolatedが発展途上なのか困りごとにもしばしば直面する。
よく書く処理で一番こたえがたいのがSingletonがサポートされていないことだ。
Singletonは便利なやつで、どんなにスケールアウトされていても、並列実行数を限定することができるし、同一IDは直列に処理したい時に便利だ。
a)
+-----------------+ +-----------------+ +-----------------+
|ORDER ID: 100 | |ORDER ID: 100 | |ORDER ID: 101 |
|TIMESTAMP: 3s ago| |TIMESTAMP: 2s ago| |TIMESTAMP: 1s ago|
+-------+---------+ +-------+---------+ +--------+--------+
| | |
+-------v--------------------v---------------------v--------+
| |
| Azure Functions |
| |
+-----------------------------------------------------------+
b)
+-----------------+
|ORDER ID: 100 |
|TIMESTAMP: 2s ago|
+-------+---------+
|
+-------v---------+ +-----------------+
|ORDER ID: 100 | |ORDER ID: 101 |
|TIMESTAMP: 3s ago| |TIMESTAMP: 1s ago|
+-------+---------+ +--------+--------+
| |
+-------v------------------------------------------v--------+
| |
| Azure Functions |
| |
+-----------------------------------------------------------+
a) はIsolatedで発生してしまう動き、 b)はもともとホスト実行型のAzure Functionsでできてた便利な処理順制御。
やっぱりb)を使いたい。というより重複実行が問題になる処理をFunctionsに載せたいからb)じゃないと困る。
ないなら書くしかなかろうも。少々汚いけども。
public class LockService
{
const string CONNECTION_STRING_NAME = "AzureWebJobsStorage";
const string CONTAINER_NAME = "azure-webjobs-hosts";
private readonly BlobContainerClient _blobContainerClient;
public LockService(IConfiguration configuration)
{
var serviceClient = new BlobServiceClient(configuration[CONNECTION_STRING_NAME]);
_blobContainerClient = serviceClient.GetBlobContainerClient(CONTAINER_NAME);
// azure-webjobs-hostsコンテナがないなんてことはないはず
// _blobContainerClient.CreateIfNotExists();
}
public async ValueTask<Lock> Lock(string filename, CancellationToken cancellationToken)
{
var client = _blobContainerClient.GetBlockBlobClient(filename);
if (!await client.ExistsAsync().ConfigureAwait(false))
{
try
{
await client.UploadAsync(new MemoryStream(Array.Empty<byte>())).ConfigureAwait(false);
}
catch (Azure.RequestFailedException ex)
{
// 並列で実行されている時は発生し得る
// Leaseがとられてる場合は412が返るのでそれは無視。
if (ex.Status != 412)
throw;
}
}
var @lock = new Lock(client.GetBlobLeaseClient());
await @lock.StartAsync(cancellationToken).ConfigureAwait(false);
return @lock;
}
}
public class Lock : IAsyncDisposable
{
private readonly BlobLeaseClient _blobLeaseClient;
private CancellationTokenSource? _cancellationTokenSource = null;
private Task? _task = null;
public Lock(BlobLeaseClient blobLeaseClient)
{
_blobLeaseClient = blobLeaseClient;
}
public async ValueTask StartAsync(CancellationToken cancellationToken)
{
if (_task != null)
{
throw new InvalidOperationException("Already Locked!");
}
while (!cancellationToken.IsCancellationRequested)
{
Azure.Response<BlobLease>? response = null;
try
{
// リース時間は60秒がマックスらしい
response = await _blobLeaseClient.AcquireAsync(TimeSpan.FromSeconds(60), cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (Azure.RequestFailedException ex)
{
// 既に誰かがリース済みなら409が返ってくる。
// 409ならリースが解放されるまで待ちたいのでthrowしない
if (ex.Status != 409)
throw;
}
if (response != null && response.Value != null && !string.IsNullOrEmpty(response.Value.LeaseId))
{
_cancellationTokenSource = new CancellationTokenSource();
var cts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.Token, cancellationToken);
// 処理が60秒で終わらない可能性があるので、バックグラウンドでリースを更新し続ける。
_task = Task.Run(async () => {
var renewed = true;
while (!cts.Token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(renewed ? 30 : 1), cts.Token).ConfigureAwait(false);
try
{
await _blobLeaseClient.RenewAsync(cancellationToken: cts.Token).ConfigureAwait(false);
renewed = true;
}
catch
{
renewed = false;
}
}
}, cts.Token);
return;
}
else
{
// 誰かがリース済みなら、10秒待ち
await Task.Delay(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
}
}
}
public async ValueTask FinishAsync(CancellationToken cancellationToken)
{
if (_task != null && _cancellationTokenSource != null)
{
_task = null;
_cancellationTokenSource.Cancel();
await _blobLeaseClient.ReleaseAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask DisposeAsync()
{
await FinishAsync(CancellationToken.None).ConfigureAwait(false);
}
}
使い方
await using var @lock = await _lockService.Lock($"path/to/the/{id}.lock", CancellationToken.None);
// 以降はロックが取れてる状態
このセルフロックの問題点は、BlobInputとかのInputバインドが使えなくなること。 ロックが取れた後に自分でブロブやらテーブルから取得しないと、古いデータで処理することになってしまう。 だからほんとは関数実行前にホスト側でやってほしい。
注意点 ロックによって確保されるのは、同じリースIDが同時実行されないということだけなので、実行される順序自体は不定。 だからステートを内包しているキューでそのステートに順序性があるものには使用できない。 まあそれは従来のFunctionでも一緒のはず。