您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

带有NEST的Elasticsearch批量插入返回es_rejected_execution_exception

带有NEST的Elasticsearch批量插入返回es_rejected_execution_exception

正如Val在评论中所说,您一次发送的数据可能会超出集群的处理能力。看来您可能正在尝试在 *

使用时_bulk,除了可以同时发送到集群的批量请求数量外,您还需要将数据以多个批量请求的形式发送到集群,并找到每个批量请求中可以发送的 最佳 文档数。

此处没有确定最佳大小的硬性规定,因为它取决于文档的复杂性,分析方式,集群硬件,集群设置,索引设置等而有所不同。

最好的做法是从一个合理的数字开始,例如在一个请求中说500个文档(或在您的上下文中有意义的某个数字),然后从那里开始。计算每个批量请求的总大小(以字节为单位)也是一种不错的方法。如果性能和吞吐量不足,请增加文档数量,请求字节大小或并发请求,直到您开始看到es_rejected_execution_exception

NEST 5.x附带了一个方便的助手,使用IObservable<T>和可观察的设计模式使批量请求变得更加容易

void Main()
{
    var client = new ElasticClient();

    // can cancel the operation by calling .Cancel() on this
    var cancellationTokenSource = new CancellationTokenSource();

    // set up the bulk all observable
    var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
        // number of concurrent requests
        .MaxDegreeOfParallelism(8)
        // in case of 429 response, how long we should wait before retrying
        .BackOffTime(TimeSpan.FromSeconds(5))
        // in case of 429 response, how many times to retry before failing
        .BackOffRetries(2)
        // number of documents to send in each request
        .Size(500)
        .Index("index-name")
        .RefreshOnCompleted(),
        cancellationTokenSource.Token
    );

    var waitHandle = new ManualResetEvent(false);
    Exception ex = null;

    // what to do on each call, when an exception is thrown, and 
    // when the bulk all completes
    var bulkAllObserver = new BulkAllObserver(
        onNext: bulkAllResponse =>
        {
            // do something after each bulk request
        },
        onError: exception =>
        {
            // do something with exception thrown
            ex = exception;
            waitHandle.Set();
        },
        onCompleted: () =>
        {
            // do something when all bulk operations complete
            waitHandle.Set();
        });

    bulkAllObservable.Subscribe(bulkAllObserver);

    // wait for handle to be set.
    waitHandle.WaitOne();

    if (ex != null)
    {
        throw ex;
    }
}

// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
    return Enumerable.Range(1, 10000).Select(x =>
        new Document
        {
            Id = x,
            Name = $"Document {x}" 
        }
    );
}

public class Document
{
    public int Id { get; set; }
    public string Name { get; set; }
}
其他 2022/1/1 18:19:49 有426人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶