Вопрос

Looking for an example of using a Nest Async function -

I want to index a large number of records, maybe 100000. So, I will use IndexManyAsync, but I don't know enough about Async. IndexManyAsync returns a task. So I am looking for an example of how to implement this.

The non-async way I was going to do this was: take 50 records, index, take next 50 records, index, etc...

With Async, should I call IndexManyAsync once with all records? or still do 50 at a time? Do I call the method, get the task back, then call Task.Start? Do I make a collection of tasks then run all at once? etc...

Looking for some guidance...

Это было полезно?

Решение

There is an example of using IndexManyAsync in the NEST Profiling.Indexing.Tester.cs class in the source repository. This should get you started in the right direction.

Другие советы

So, I created a method, based on that test:

public IEnumerable<IBulkResponse> ExportAllProducts(string indexName, int? productsCount)
    {
        var allTasks = ExportBulkProducts(indexName,productsCount);

        var results = new List<IBulkResponse>();
        try
        {
            var tokenSource2 = new CancellationTokenSource();
            var cancellationToken = tokenSource2.Token;
            Task.WaitAll(allTasks.ToArray<Task>(), cancellationToken);

            results = allTasks.Select(response => response.Result).ToList();
        }
        catch (AggregateException e)
        {
            var messages = new List<String>();
            messages.AddRange(e.InnerExceptions.Select(v => e.Message + " " + v.Message));
            throw new CustomException(messages);
        }
        return results;
    }

    /// <summary>
    /// Genete the tasks needed to run
    /// </summary>
    /// <param name="indexName">The index name</param>
    /// <param name="productsCount">The number of products to index</param>
    /// <returns>List of Tasks</returns>
    private IList<Task<IBulkResponse>> ExportBulkProducts(string indexName, int? productsCount)
    {
        var allTasks = new List<Task<IBulkResponse>>();
        using (var productRepository = new ProductRepository(new SearchContext()))
        {
            var totalProducts = productsCount ?? productRepository.TotalProducts();

            var itemsPerPage = 50;
            if (productsCount != null)
            {
                var count = productsCount.Value;
                itemsPerPage = count < 50 ? productsCount.Value : 50;
            }
            int totalPages = totalProducts / itemsPerPage;

            for (var i = 0; i < totalPages; i++)
            {
                var products = productRepository.SelectAllProducts(itemsPerPage, itemsPerPage * i);
                var response = Client.IndexAsync(indexName, typeof(ElasticSearchProduct), products, true);
                allTasks.Add(response);
            }
        }
        return allTasks;
    }

And this technically works, meaning I get testable results. But I am not sure it is actually happening Async. Specifically, when I step through the code, each time I call IndexAsync, I get an Task back and add it to an array of tasks, to be run all together, but the fields of this task tell me that it has already run, so when I go to call all the tasks at once:

Task.WaitAll(allTasks.ToArray<Task>(), cancellationToken);

It takes no time at all because all the tasks have already been run. Maybe this is correct? I had thought it would wait and run all the tasks together?

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top