最近のアクセス:
Kafka Producer および Consumer 外部オブジェクト

ここでは、Apache Kafka の Publish-Subscribe API を通じて、専用の外部オブジェクトを使用してメッセージを送受信する手順について説明します。
用意されている外部オブジェクトを使用して、パラメーターを設定し、メッセージを送受信します。実装は次のドキュメントに従っています。
https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1

手順

Consumer

Consumer の例として、常時稼動して新しいイベントを待機するデーモンがあります。Consumer メソッドでキューまたはストリームに新しいイベントを問い合わせて、キーと値のコレクションを取得します。
&Consumer.Configuration = "{'bootstrap.servers': 'localhost:9092', 'group.id': 'mi-nuevo-grupo3', 'auto.offset.reset': 'earliest', 'request.timeout.ms': 500}"
&Timeout = 10000
do while (true)
    &ListMessagingResponse.Clear()
    &Consumer.Consume(!"Topic1", &Timeout, &ListMessagingResponse)    //&ListMessagingResponse はキー、値、トピック、エラーの情報のコレクション
    PrintResponse(&ListMessagingResponse)
enddo


Producer

Producer は非同期であるため、ストリームに大量のイベントを追加します。この外部オブジェクトには、すべてのイベントがストリームに追加されるまで待機する Finish メソッドがあります。
&Producer.Configuration = "{'bootstrap.servers': 'localhost:9092', 'default.topic.config': {'message.timeout.ms': 10000}}"
do while(true)
    &ListMessagingResponse.Clear()
    &Text = GetTextToSend()  // &Text は送信されたメッセージの文字列
    &OK = &Producer.ProduceAsync("Topic1", "key_1" ,&Text ) 
    &ListMessagingResponse = &Producer.Finish(1500) //メッセージ送信が完了するまで最大で 1.5 秒待機 
    if (&ListMessagingResponse.Count <> 1) 
        msg("Some error ocurred", status) 
    endif 
    PrintResponse(&ListMessagingResponse) 
enddo

参考情報

補足 - Kafka のテストに関するヒント

1.- 次の記事の手順に従います。
https://devops.profitbricks.com/tutorials/install-and-configure-apache-kafka-on-ubuntu-1604-1/
2.- kafka<バージョン>/config/server.properties ファイルに次の行を追加して設定を行います:
advertised.host.name = <IP マシン>
3.- Windows からテストする場合は、http://www.kafkatool.com/ をダウンロードできます。


サブページ
Created: 19/03/27 20:22 by Admin Last update: 21/05/20 08:02 by Admin
カテゴリ
Powered by GXwiki 3.0