続々Azure Functions IsolatedでSingleton

dotnet isolatedなAzure FunctionsでどうしてもSingletonしたいシリーズ。

前回こうやればよさそうってとこまでは考えた。

今回は、これを実行するためのライブラリを書いた。

1つは、Azure Functionの実行をシングルトンになるようにせき止めるためのミドルウェア。

使い方は、起動時にミドルウェアを差し込んで、

var host = new HostBuilder()
    .ConfigureFunctionsWorkerDefaults(app => {
        app.UseMiddleware<Iwate.AzureFunctions.Middlewares.Singleton.SingletonMiddleware>();
    })
    .Build();
host.Run();

シングルトンにしたい関数に属性をつける。

public class SingletonFunction
{
    [Function("SingletonFunction")]
    [Singleton]
    public async Task RunAsync([QueueTrigger(QUEUE_NAME)]string message)
    {
        ...
    }
}

トピック別にしたいときは、引数にプロパティ名を渡す。BindingDataにその名前の値があればそれをキーにしてトピックごとにシングルトンになる。

public class SingletonFunction
{
    [Function("SingletonFunction")]
    [Singleton(nameof(Trigger.PartialKey))]
    public async Task RunAsync([QueueTrigger(QUEUE_NAME)]Trigger trigger)
    {
        ...
    }
}
public class Trigger
{
    public string PartialKey { get; set; }
    public string Message { get; set; }
}

2つ目のライブラリは、Blobにキューを作ってペイロードをそのキューに格納するためのもの。

前回、ロックファイルのシングルトンだけだと、同一トピック上で実行順序が変わってしまう現象の解決策として考えたやつで。キューメッセージを起動メッセージとペイロードに分割し、ペイロードは時系列順に並ぶ永続化層にためておいて、起動メッセージを受け取り、ロックが取れた関数がその永続化層から順番にデータを処理していくスタイル。

使い方は、CQRSのコマンドパターンぽい感じ。

まず、コマンドを作る

public class Command001Args
{
}
public class Command001 : ICommand
{
    private RegisterOrganizationCommandArgs _args;
    public Command001(InvokeRequest req)
    {
         _args = JsonSerializer.Deserialize<Command001Args>(req.Payload) 
                             ?? throw new ArgumentException(nameof(req));
    }
    public async Task ExecuteAsync(CancellationToken cancellationToken)
    {
         ...
    }
}

ServiceCollectionにAddCommanderする。この時、作成したコマンドを登録する。

services.AddCommander(builder =>
{
    builder
        .UseAzureBlobCommandStorage()
        .AddCommand<Command001>()
        .AddCommand<Command002>();
});

コマンドを実行するときは、コマンドをリクエストする実体とコマンドを実行する実体が分かれてるイメージ。例えば、HTTPトリガーでコマンドをキューイングして、Queueトリガーが実行する感じ。

まずは、キューイングする方。Commanderに実行したいコマンドのリクエストをキューイングする。

public class InvokeCommand
{
    private readonly Commander _commander;

    public InvokeCommand(Commander commander)
    {
        _commander = commander;
    }

    [Function("InvokeCommand")]
    public async Task<Output> RunAsync(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post"] HttpRequestData req,
        string command,
        CancellationToken cancellationToken)
    {
                string topic = null;
        string user = null;
        var id = await _commander.EnqueueAsync(topic, user, command, req.Body, cancellationToken);

        var res = req.CreateResponse(HttpStatusCode.OK);

        await res.WriteAsJsonAsync(new { id });

        return new Output
        {
            Response = res,
            Trigger = new ExecuteCommand.Trigger { PartialKey = topic },
        };
    }

    public class Output
    {
        public required HttpResponseData Response { get; init; }

        [QueueOutput(ExecuteCommand.QUEUE_NAME)]
        public Workers.ExecuteCommand.Trigger? Trigger { get; init; }
    }
}

次に実行する方。Commanderからピークして先頭のコマンドを取得して実行する。
実行後は手動でキューから削除する。この辺ラップするか悩んだけど、使う側に任せる方向にした。

public class ExecuteCommand
{
    public const string QUEUE_NAME = "executecommands";
    private readonly Commander _commander;

    public ExecuteCommand(Commander<Services.InvokeState> commander)
    {
        _commander = commander;
    }

    [Function("ExecuteCommand")]
    [Singleton(nameof(Trigger.PartialKey))]
    public async Task RunAsync(
        [QueueTrigger(QUEUE_NAME)] Trigger trigger, 
        CancellationToken cancellationToken)
    {
        var req = await _commander.PeekAsync(trigger.PartialKey, cancellationToken);

        if (req == null) {
            return;
        }

        var state = await _commander.GetStateAsync(req.Id, cancellationToken);

        state.Status = InvokeStatus.Processing;
        await _commander.SetStateAsync(state, cancellationToken);

        try
        {
            await _commander.InvokeAsync(req, cancellationToken);

            state.Status = InvokeStatus.Succeeded;
            await _commander.SetStateAsync(state, cancellationToken);
            await _commander.RemoveAsync(req, cancellationToken);
        }
        catch(Exception ex){
            state.Status = InvokeStatus.Failed;
            await _commander.SetStateAsync(state, cancellationToken);
            await _commander.RemoveAsync(req, cancellationToken);
        }
    }

    public class Trigger
    {
        public required string PartialKey { get; set; }
    }
}

ステータスを更新していくとこが野暮ったいけど、ステータスオブジェクトをカスタム可能にしようと思ったらこうなった。

課題としては、トリガー用のStorage QueueとBlobの中に作ったコマンドキューとの間でキューの長さがずれてしまった場合、トリガーが全部終わってもコマンドキューの中に処理されないコマンドが残ってしまう可能性がある。 ワーカーの作り方をもうちょい考えた方がいいかも。例えば1トリガー1Invokeじゃななくて、ループ回して時間の許す限り実行しちゃうとか。


You'll only receive email when they publish something new.

More from iwate
All posts