ここでは、Apache Kafka の Publish-Subscribe API を通じて、専用の外部オブジェクトを使用してメッセージを送受信する手順について説明します。
用意されている外部オブジェクトを使用して、パラメーターを設定し、メッセージを送受信します。実装は次のドキュメントに従っています。
https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1
Consumer の例として、常時稼動して新しいイベントを待機するデーモンがあります。Consumer メソッドでキューまたはストリームに新しいイベントを問い合わせて、キーと値のコレクションを取得します。
&Consumer.Configuration = "{'bootstrap.servers': 'localhost:9092', 'group.id': 'mi-nuevo-grupo3', 'auto.offset.reset': 'earliest', 'request.timeout.ms': 500}"
&Timeout = 10000
do while (true)
&ListMessagingResponse.Clear()
&Consumer.Consume(!"Topic1", &Timeout, &ListMessagingResponse) //&ListMessagingResponse はキー、値、トピック、エラーの情報のコレクション
PrintResponse(&ListMessagingResponse)
enddo
Producer
Producer は非同期であるため、ストリームに大量のイベントを追加します。この外部オブジェクトには、すべてのイベントがストリームに追加されるまで待機する Finish メソッドがあります。
&Producer.Configuration = "{'bootstrap.servers': 'localhost:9092', 'default.topic.config': {'message.timeout.ms': 10000}}"
do while(true)
&ListMessagingResponse.Clear()
&Text = GetTextToSend() // &Text は送信されたメッセージの文字列
&OK = &Producer.ProduceAsync("Topic1", "key_1" ,&Text )
&ListMessagingResponse = &Producer.Finish(1500) //メッセージ送信が完了するまで最大で 1.5 秒待機
if (&ListMessagingResponse.Count <> 1)
msg("Some error ocurred", status)
endif
PrintResponse(&ListMessagingResponse)
enddo
1.- 次の記事の手順に従います。
https://devops.profitbricks.com/tutorials/install-and-configure-apache-kafka-on-ubuntu-1604-1/
2.- kafka<バージョン>/config/server.properties ファイルに次の行を追加して設定を行います: advertised.host.name = <IP マシン>
3.- Windows からテストする場合は、http://www.kafkatool.com/ をダウンロードできます。
|