Skip to content

Suggest to add a chunkBy function (to group and yield adjacent items sharing the same key) to the library #156

@natalie-o-perret

Description

@natalie-o-perret

Description

Bumped into this at work the other day, and thought this could be a nice addition to the library (unless it's already part of it under another name which wouldn't be too surprising since I'm pretty good at missing out the very obvious).

Basically the idea would be to add a chunkBy function to the AsyncSeq module that allows to group and yield adjacent items sharing the same key in the gist of what has been done for Seq.chunkBy in the F#+ library here, i.e.:

open System
open System.Threading

open FSharpPlus

open FSharp.Control


[<RequireQualifiedAccess>]
module AsyncSeq =
    
    let chunkBy (projection: 'T -> 'Key) (source: AsyncSeq<'T>) = asyncSeq {
        use e = source.GetEnumerator()
        let mutable currentMaybe = None
        let! firstCurrentMaybe = e.MoveNext()
        currentMaybe <- firstCurrentMaybe
        if currentMaybe.IsSome then
            let mutable g = projection currentMaybe.Value
            let mutable members = ResizeArray()
            members.Add currentMaybe.Value
            let! preWhileCurrentMaybe = e.MoveNext()
            currentMaybe <- preWhileCurrentMaybe
            while currentMaybe.IsSome do
                let key = projection currentMaybe.Value
                if g = key then
                    members.Add currentMaybe.Value
                else
                    yield (g, members)
                    g <- key
                    members <- ResizeArray ()
                    members.Add currentMaybe.Value
                let! whileCurrentMaybe = e.MoveNext()
                currentMaybe <- whileCurrentMaybe
            yield (g, members)
    }

let chunkBy (projection: 'T -> 'Key) (source: _ seq) = seq {
    use e = source.GetEnumerator ()
    if e.MoveNext () then
        let mutable g = projection e.Current
        let mutable members = ResizeArray ()
        members.Add e.Current
        while e.MoveNext () do
            let key = projection e.Current
            if g = key then members.Add e.Current
            else
                yield g, members
                g <- key
                members <- ResizeArray ()
                members.Add e.Current
        yield g, members }

A naive impl., ahem translation of above I ended up with:

let chunkBy (projection: 'T -> 'Key) (source: AsyncSeq<'T>) = asyncSeq {
    use e = source.GetEnumerator()
    let mutable currentMaybe = None
    let! firstCurrentMaybe = e.MoveNext()
    currentMaybe <- firstCurrentMaybe
    if currentMaybe.IsSome then
        let mutable g = projection currentMaybe.Value
        let mutable members = ResizeArray()
        members.Add currentMaybe.Value
        let! preWhileCurrentMaybe = e.MoveNext()
        currentMaybe <- preWhileCurrentMaybe
        while currentMaybe.IsSome do
            let key = projection currentMaybe.Value
            if g = key then
                members.Add currentMaybe.Value
            else
                yield (g, members)
                g <- key
                members <- ResizeArray ()
                members.Add currentMaybe.Value
            let! whileCurrentMaybe = e.MoveNext()
            currentMaybe <- whileCurrentMaybe
        yield (g, members)
}

I've checked a bunch of Seq more functional approaches which seem to require more allocations: https://stackoverflow.com/a/38495042/4636721

And here is a working example:

[<EntryPoint>]
let main _ =
    
    let generateCpiEntities() = 
        let alphabet = [| 'a' .. 'z' |]
        seq { 0 .. 13 }
        |> Seq.map (fun x -> Thread.Sleep 500; FakeCpiEntity.Of(x / 3, [ Array.get alphabet x ]))
    
    let pushGroupsToGrpc = String.join "," >> sprintf "Push %A" >> String.replace "\n" String.Empty >> printfn "%s"
    let grpcEntityToSendThreshold = 2
    
    generateCpiEntities()                           
    |> Seq.chunkBy     FakeCpiEntity.ToPrimaryKey
    |> Seq.map         FakeGrpcEntity.OfCpiEntities
    |> Seq.chunkBySize grpcEntityToSendThreshold
    |> Seq.iter        pushGroupsToGrpc

    generateCpiEntities()
    |> AsyncSeq.ofSeq
    |> AsyncSeq.chunkBy     FakeCpiEntity.ToPrimaryKey
    |> AsyncSeq.map         FakeGrpcEntity.OfCpiEntities
    |> AsyncSeq.chunkBySize grpcEntityToSendThreshold
    |> AsyncSeq.iter        pushGroupsToGrpc
    |> Async.RunSynchronously
    
    0

Sample output:

Push "{ Id = 0  Items = ['a'; 'b'; 'c'] },{ Id = 1  Items = ['d'; 'e'; 'f'] }"
Push "{ Id = 2  Items = ['g'; 'h'; 'i'] },{ Id = 3  Items = ['j'; 'k'; 'l'] }"
Push "{ Id = 4  Items = ['m'; 'n'] }"
Push "{ Id = 0  Items = ['a'; 'b'; 'c'] },{ Id = 1  Items = ['d'; 'e'; 'f'] }"
Push "{ Id = 2  Items = ['g'; 'h'; 'i'] },{ Id = 3  Items = ['j'; 'k'; 'l'] }"
Push "{ Id = 4  Items = ['m'; 'n'] }"

(Also, I know [random] vertical alignment is dumb but I can't help it)

Wdyt?
Do you think this feature is worthy-enough🔨⚡ (or just relevant 🤔) to be part of the library?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions