0
点赞
收藏
分享

微信扫一扫

Azure Service Bus 实现定时器以及按序执行任务

半夜放水 2022-01-07 阅读 24
azurec#
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using System.Collections;
using Newtonsoft.Json.Linq;
using System.Collections.Generic;

namespace ServiceBusConsole
{
internal class Program
{
// connection string to your Service Bus namespace
static string connectionString = "Endpoint=sb://";

// name of your Service Bus topic
static string topicName = "actieue";

static Queue<string> a;
// number of messages to be sent to the topic
private const int taskNumber = 8;

static void Main(string[] args)
{
Random random = new Random();
a = new Queue<string>();
for(int i = 0; i < taskNumber; i++)
{
JObject jo = new JObject();
jo["taskId"] = i;
jo["excutionTime"] = random.Next(10);
a.Enqueue(jo.ToString());
}
ArrayList b = new ArrayList();
b.Add(a.Dequeue());
sendMessage(b).Wait();
get().Wait();
}
public static async Task sendMessage(ArrayList msgs)
{
// the client that owns the connection and can be used to create senders and receivers
ServiceBusClient client;

// the sender used to publish messages to the topic
ServiceBusSender sender;

// The Service Bus client types are safe to cache and use as a singleton for the lifetime
// of the application, which is best practice when messages are being published or read
// regularly.
//
// Create the clients that we'll use for sending and processing messages.
client = new ServiceBusClient(connectionString);
sender = client.CreateSender(topicName);

// create a batch
ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

foreach (Object o in msgs)
{
string msg = (string)o;
// try adding a message to the batch
ServiceBusMessage m = new ServiceBusMessage(msg);
JObject j = JObject.Parse(msg);
double d = double.Parse(j["excutionTime"].ToString());
m.ScheduledEnqueueTime = DateTimeOffset.Now.AddSeconds(d);
if (!messageBatch.TryAddMessage(m))
{
// if it is too large for the batch
throw new Exception(
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using System.Collections;
using Newtonsoft.Json.Linq;
using System.Collections.Generic;

namespace ServiceBusConsole
{
    internal class Program
    {
        // connection string to your Service Bus namespace
        static string connectionString = "Endpoint=sb://";

        // name of your Service Bus topic
        static string topicName = "actieue";

        static Queue<string> a;
        // number of messages to be sent to the topic
        private const int taskNumber = 8;

        static void Main(string[] args)
        {
            Random random = new Random();
            a = new Queue<string>();
            for(int i = 0; i < taskNumber; i++)
            {
                JObject jo = new JObject();
                jo["taskId"] = i;
                jo["excutionTime"] = random.Next(10);
                a.Enqueue(jo.ToString());
            }
            ArrayList b = new ArrayList();
            b.Add(a.Dequeue());
            sendMessage(b).Wait();
            get().Wait();
        }
        public static async Task sendMessage(ArrayList msgs)
        {
            // the client that owns the connection and can be used to create senders and receivers
            ServiceBusClient client;

            // the sender used to publish messages to the topic
            ServiceBusSender sender;

            // The Service Bus client types are safe to cache and use as a singleton for the lifetime
            // of the application, which is best practice when messages are being published or read
            // regularly.
            //
            // Create the clients that we'll use for sending and processing messages.
            client = new ServiceBusClient(connectionString);
            sender = client.CreateSender(topicName);

            // create a batch 
            ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

            foreach (Object o in msgs)
            {
                string msg = (string)o;
                // try adding a message to the batch
                ServiceBusMessage m = new ServiceBusMessage(msg);
                JObject j = JObject.Parse(msg);
                double d = double.Parse(j["excutionTime"].ToString());
                m.ScheduledEnqueueTime = DateTimeOffset.Now.AddSeconds(d);
                if (!messageBatch.TryAddMessage(m))
                {
                    // if it is too large for the batch
                    throw new Exception($"The message msg is too large to fit in the batch.");
                }
            }

            try
            {
                // Use the producer client to send the batch of messages to the Service Bus topic
                await sender.SendMessagesAsync(messageBatch);
            }
            finally
            {
                // Calling DisposeAsync on client types is required to ensure that network
                // resources and other unmanaged objects are properly cleaned up.
                messageBatch.Dispose();
                await sender.DisposeAsync();
                await client.DisposeAsync();
            }
        }
        public static async Task get()
        {
            ServiceBusClient client;
            ServiceBusProcessor processor;
            client = new ServiceBusClient(connectionString);
            processor = client.CreateProcessor(topicName, new ServiceBusProcessorOptions());

            try
            {
                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;

                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;

                // start processing 
                await processor.StartProcessingAsync();

                Console.WriteLine("Start Task 0 is runing");
                Console.WriteLine($"It is {DateTimeOffset.Now} now");
                Console.ReadKey();

                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
            finally
            {
                // Calling DisposeAsync on client types is required to ensure that network
                // resources and other unmanaged objects are properly cleaned up.
                await processor.DisposeAsync();
                await client.DisposeAsync();
            }

        }

        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            ArrayList c = new ArrayList();
            string body = args.Message.Body.ToString();
            try
            {
                c.Add(a.Dequeue());
                sendMessage(c).Wait();
            }
            catch (Exception ex)
            {

            }

            Console.WriteLine($"At {DateTimeOffset.Now},This task is finished:\nReceived: {body} ");

            // complete the message. messages is deleted from the subscription. 
            await args.CompleteMessageAsync(args.Message);
        }

        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
    }
}
quot;The message msg is too large to fit in the batch."
);
}
}

try
{
// Use the producer client to send the batch of messages to the Service Bus topic
await sender.SendMessagesAsync(messageBatch);
}
finally
{
// Calling DisposeAsync on client types is required to ensure that network
// resources and other unmanaged objects are properly cleaned up.
messageBatch.Dispose();
await sender.DisposeAsync();
await client.DisposeAsync();
}
}
public static async Task get()
{
ServiceBusClient client;
ServiceBusProcessor processor;
client = new ServiceBusClient(connectionString);
processor = client.CreateProcessor(topicName, new ServiceBusProcessorOptions());

try
{
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;

// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;

// start processing
await processor.StartProcessingAsync();

Console.WriteLine("Start Task 0 is runing");
Console.WriteLine(
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using System.Collections;
using Newtonsoft.Json.Linq;
using System.Collections.Generic;

namespace ServiceBusConsole
{
    internal class Program
    {
        // connection string to your Service Bus namespace
        static string connectionString = "Endpoint=sb://";

        // name of your Service Bus topic
        static string topicName = "actieue";

        static Queue<string> a;
        // number of messages to be sent to the topic
        private const int taskNumber = 8;

        static void Main(string[] args)
        {
            Random random = new Random();
            a = new Queue<string>();
            for(int i = 0; i < taskNumber; i++)
            {
                JObject jo = new JObject();
                jo["taskId"] = i;
                jo["excutionTime"] = random.Next(10);
                a.Enqueue(jo.ToString());
            }
            ArrayList b = new ArrayList();
            b.Add(a.Dequeue());
            sendMessage(b).Wait();
            get().Wait();
        }
        public static async Task sendMessage(ArrayList msgs)
        {
            // the client that owns the connection and can be used to create senders and receivers
            ServiceBusClient client;

            // the sender used to publish messages to the topic
            ServiceBusSender sender;

            // The Service Bus client types are safe to cache and use as a singleton for the lifetime
            // of the application, which is best practice when messages are being published or read
            // regularly.
            //
            // Create the clients that we'll use for sending and processing messages.
            client = new ServiceBusClient(connectionString);
            sender = client.CreateSender(topicName);

            // create a batch 
            ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

            foreach (Object o in msgs)
            {
                string msg = (string)o;
                // try adding a message to the batch
                ServiceBusMessage m = new ServiceBusMessage(msg);
                JObject j = JObject.Parse(msg);
                double d = double.Parse(j["excutionTime"].ToString());
                m.ScheduledEnqueueTime = DateTimeOffset.Now.AddSeconds(d);
                if (!messageBatch.TryAddMessage(m))
                {
                    // if it is too large for the batch
                    throw new Exception($"The message msg is too large to fit in the batch.");
                }
            }

            try
            {
                // Use the producer client to send the batch of messages to the Service Bus topic
                await sender.SendMessagesAsync(messageBatch);
            }
            finally
            {
                // Calling DisposeAsync on client types is required to ensure that network
                // resources and other unmanaged objects are properly cleaned up.
                messageBatch.Dispose();
                await sender.DisposeAsync();
                await client.DisposeAsync();
            }
        }
        public static async Task get()
        {
            ServiceBusClient client;
            ServiceBusProcessor processor;
            client = new ServiceBusClient(connectionString);
            processor = client.CreateProcessor(topicName, new ServiceBusProcessorOptions());

            try
            {
                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;

                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;

                // start processing 
                await processor.StartProcessingAsync();

                Console.WriteLine("Start Task 0 is runing");
                Console.WriteLine($"It is {DateTimeOffset.Now} now");
                Console.ReadKey();

                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
            finally
            {
                // Calling DisposeAsync on client types is required to ensure that network
                // resources and other unmanaged objects are properly cleaned up.
                await processor.DisposeAsync();
                await client.DisposeAsync();
            }

        }

        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            ArrayList c = new ArrayList();
            string body = args.Message.Body.ToString();
            try
            {
                c.Add(a.Dequeue());
                sendMessage(c).Wait();
            }
            catch (Exception ex)
            {

            }

            Console.WriteLine($"At {DateTimeOffset.Now},This task is finished:\nReceived: {body} ");

            // complete the message. messages is deleted from the subscription. 
            await args.CompleteMessageAsync(args.Message);
        }

        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
    }
}
quot;It is {DateTimeOffset.Now} now"
);
Console.ReadKey();

// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
finally
{
// Calling DisposeAsync on client types is required to ensure that network
// resources and other unmanaged objects are properly cleaned up.
await processor.DisposeAsync();
await client.DisposeAsync();
}

}

static async Task MessageHandler(ProcessMessageEventArgs args)
{
ArrayList c = new ArrayList();
string body = args.Message.Body.ToString();
try
{
c.Add(a.Dequeue());
sendMessage(c).Wait();
}
catch (Exception ex)
{

}

Console.WriteLine(
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using System.Collections;
using Newtonsoft.Json.Linq;
using System.Collections.Generic;

namespace ServiceBusConsole
{
    internal class Program
    {
        // connection string to your Service Bus namespace
        static string connectionString = "Endpoint=sb://";

        // name of your Service Bus topic
        static string topicName = "actieue";

        static Queue<string> a;
        // number of messages to be sent to the topic
        private const int taskNumber = 8;

        static void Main(string[] args)
        {
            Random random = new Random();
            a = new Queue<string>();
            for(int i = 0; i < taskNumber; i++)
            {
                JObject jo = new JObject();
                jo["taskId"] = i;
                jo["excutionTime"] = random.Next(10);
                a.Enqueue(jo.ToString());
            }
            ArrayList b = new ArrayList();
            b.Add(a.Dequeue());
            sendMessage(b).Wait();
            get().Wait();
        }
        public static async Task sendMessage(ArrayList msgs)
        {
            // the client that owns the connection and can be used to create senders and receivers
            ServiceBusClient client;

            // the sender used to publish messages to the topic
            ServiceBusSender sender;

            // The Service Bus client types are safe to cache and use as a singleton for the lifetime
            // of the application, which is best practice when messages are being published or read
            // regularly.
            //
            // Create the clients that we'll use for sending and processing messages.
            client = new ServiceBusClient(connectionString);
            sender = client.CreateSender(topicName);

            // create a batch 
            ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

            foreach (Object o in msgs)
            {
                string msg = (string)o;
                // try adding a message to the batch
                ServiceBusMessage m = new ServiceBusMessage(msg);
                JObject j = JObject.Parse(msg);
                double d = double.Parse(j["excutionTime"].ToString());
                m.ScheduledEnqueueTime = DateTimeOffset.Now.AddSeconds(d);
                if (!messageBatch.TryAddMessage(m))
                {
                    // if it is too large for the batch
                    throw new Exception($"The message msg is too large to fit in the batch.");
                }
            }

            try
            {
                // Use the producer client to send the batch of messages to the Service Bus topic
                await sender.SendMessagesAsync(messageBatch);
            }
            finally
            {
                // Calling DisposeAsync on client types is required to ensure that network
                // resources and other unmanaged objects are properly cleaned up.
                messageBatch.Dispose();
                await sender.DisposeAsync();
                await client.DisposeAsync();
            }
        }
        public static async Task get()
        {
            ServiceBusClient client;
            ServiceBusProcessor processor;
            client = new ServiceBusClient(connectionString);
            processor = client.CreateProcessor(topicName, new ServiceBusProcessorOptions());

            try
            {
                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;

                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;

                // start processing 
                await processor.StartProcessingAsync();

                Console.WriteLine("Start Task 0 is runing");
                Console.WriteLine($"It is {DateTimeOffset.Now} now");
                Console.ReadKey();

                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
            finally
            {
                // Calling DisposeAsync on client types is required to ensure that network
                // resources and other unmanaged objects are properly cleaned up.
                await processor.DisposeAsync();
                await client.DisposeAsync();
            }

        }

        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            ArrayList c = new ArrayList();
            string body = args.Message.Body.ToString();
            try
            {
                c.Add(a.Dequeue());
                sendMessage(c).Wait();
            }
            catch (Exception ex)
            {

            }

            Console.WriteLine($"At {DateTimeOffset.Now},This task is finished:\nReceived: {body} ");

            // complete the message. messages is deleted from the subscription. 
            await args.CompleteMessageAsync(args.Message);
        }

        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
    }
}
quot;At {DateTimeOffset.Now},This task is finished:\nReceived: {body} "
);

// complete the message. messages is deleted from the subscription.
await args.CompleteMessageAsync(args.Message);
}

// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
}
}
举报

相关推荐

0 条评论