続々々Azure Functions IsolatedでSingleton

これが、

こうなり、

こうなった。

最終的に、CQRS用のライブラリを作りに至ったわけだけど、

課題としては、トリガー用のStorage QueueとBlobの中に作ったコマンドキューとの間でキューの長さがずれてしまった場合、トリガーが全部終わってもコマンドキューの中に処理されないコマンドが残ってしまう可能性がある。

という課題が残っていた。

これをどうしようかなあと思っていて、ずっと放置していたのでこの週末実装した。

アイディアとしては、キューが同一トランザクションで書き込みできないStorageBlobとStorageQueueに分離してしまうからズレる可能性が生まれるわけで、Blobに書いたものを拾って次につなげればいいはず※1。

こういうの何っていうんだっけ?分散トランザクションじゃなくて違う言い方があったと思うんだけどなあ。思い出せない。

基本的な使い方は変わっていないが、StorageQueueを繋ぎに使わないとこだけ違う。

public class InvokeCommand
{
    private readonly Commander _commander;

    public InvokeCommand(Commander commander)
    {
        _commander = commander;
    }

    [Function("InvokeCommand")]
    public async Task<HttpResponseData> RunAsync(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post"] HttpRequestData req,
        string command,
        CancellationToken cancellationToken)
    {
        string topic = null;
        string user = null;
        var id = await _commander.EnqueueAsync(topic, user, command, req.Body, cancellationToken);

        var res = req.CreateResponse(HttpStatusCode.OK);

        await res.WriteAsJsonAsync(new { id });

        return res;
    }
}

QueueTriggerではなく、BlobTriggerを使う。ローカルだとEventGridが受け取れないのでのでプリプロセッサでトリガーソースを変えておくと楽。

public class ExecuteCommand
{
    public const string BLOB_TRIGGER_PREFIX = "commands/queue";
    public const BlobTriggerSource BLOB_TRIGGER_SOURCE = 
#if Release
        BlobTriggerSource.EventGrid;
#else
        BlobTriggerSource.LogsAndContainerScan;
#endif
    private readonly Commander<Services.InvokeState> _commander;
    private readonly ICommandStoragePathResolver _pathResolver;
    private readonly LockService _lockService;

    public ExecuteCommand(
        Commander<Services.InvokeState> commander, 
        ICommandStoragePathResolver pathResolver, 
        LockService lockService) 
    {
        _commander = commander;
        _pathResolver = pathResolver;
        _lockService = lockService;
    }

    [Function("ExecuteCommand")]
    public async Task RunAsync(
        [BlobTrigger($"{BLOB_TRIGGER_PREFIX}/{{path}}")]BlobClient blob, 
        CancellationToken cancellationToken)
    {
        // BlobTriggerでBlobのパスを拾ってそこからInvokeRequestを取り出す
        if (!_pathResolver.TryParseQueue(blob.Name, out var trigger))
            return;

        // せっかく作ったSingletonMiddlewareが
        // 結局明示的に呼び出す形に戻ってしまったのは致し方なし
        await using var @lock = await _lockService.Lock(_pathResolver.GetQueueDirPath(trigger.Partition), cancellationToken);

        var req = await _commander.PeekAsync(trigger.Partition, cancellationToken);

        if (req == null)
            return;

        var state = await _commander.GetStateAsync(req.Id, cancellationToken);

        await _commander.SetStateAsync(state, cancellationToken);

        try
        {
            await _commander.InvokeAsync(req, cancellationToken);

            state.Status = InvokeStatus.Succeeded;
            await _commander.SetStateAsync(state, cancellationToken);
            await _commander.RemoveAsync(req, cancellationToken);
        }
        catch(Exception ex){
            state.Status = InvokeStatus.Failed;
            state.Details = ex.Message;
            await _commander.SetStateAsync(state, cancellationToken);
            await _commander.RemoveAsync(req, cancellationToken);
        }
    }
}

これなら、StorageBlobに書いたけどStorageQueueにかけなかったってことがなくなり、Blobの中のキューとStorageQueueのキューの長さが変わることはないはず。

EventGridはBlobTriggerじゃなくて、StorageQueueにも接続できるので、QueueTriggerは残したまま、StorageQueueへのエンキューをEventGridに任せるという方法もとれる。
それならpoisonからの復帰が楽かも。

※1そもそも、CosmosDBのChangeTriggerとかのほうがいいんだろうけど、あえてStorageのみでやることを前提にしてる。一応あげられる最もらしい理由としては、Blobなら1GBのCSVファイルとかをCommandペイロードに一緒に保存できるとか。


You'll only receive email when they publish something new.

More from iwate
All posts