import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * 异步发送
 */
public class AsyncProducer {
    public static void main(
            String[] args) throws MQClientException, InterruptedException {
        //生产者实例化
        DefaultMQProducer producer = new DefaultMQProducer("async");
        //指定rocket服务器地址
        producer.setNamesrvAddr("zjj101:9876");
        //启动实例
        producer.start();
        //发送异步失败时的重试次数(这里不重试)
        //这就参数设置为0就是不重试,设置成1的话,如果发送失败了最多重试一次.
        producer.setRetryTimesWhenSendAsyncFailed(0);
        int messageCount = 10;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("TopicTest",
                        "TagC",
                        "OrderID"+index,
                        ("Hello world "+index).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //生产者异步发送
                producer.send(msg, new SendCallback() {
                    
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, new String(msg.getBody()));
                    }
                    
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}                
                










