S

siamese

(very) Simple Advanced MESage queue Example

Project badge

Archived project! Repository and other project resources are read only

THIS PROJECT HAS BEEN ARCHIVED!

Purpose of the scripts/scenarios

Validate multiple publishers (aka. producers) and multiple consumers working on the same queue, exchange messages via some AMQP message broker without any message corruption. Messages should only be consumed once by a specific allowed consumer.

Best practice?

How does the best practice deployment look like?

  • Have one broker accessibly via VPN (or other means of network layer security) for pure internal use and one for public / cloud?
  • Only one broker instance serving internal as well as public / cloud work load - and if, what kind of security mechanisms are available (authentication / authorization)?

Birds eye view mermaid diagram

graph LR
    subgraph Cloud
        AMQP((Message Queue))
    end

    subgraph OnPrem/Cloud
        Producer1{Producer 1} -- send --> AMQP
        Producer2{Producer 2} -- send --> AMQP
        ProducerX[...] -. send .-> AMQP
        ProducerN{Producer n} -- send --> AMQP
    end

    subgraph OnPrem/Cloud
        AMQP -- receive --> Consumer1(Consumer 1)
        Consumer1 -. "ack" .-> AMQP
        AMQP -- receive --> Consumer2(Consumer 2)
        Consumer2 -. ack .-> AMQP
        AMQP -. receive .-> ConsumerX(...)
        ConsumerX -. ack .-> AMQP
        AMQP -- receive --> ConsumerN(Consumer n)
        ConsumerN -. ack .-> AMQP
    end

The following test cases shall be possible to test

  • 1 producer, n subscribers (typical example: 'newsletter', etc.)
  • n producers and 1 subscriber (typical example: IoT)
  • m producers and n subscribers ((typical?) example: price generation for products sold at whatever PoS)

Scenario 1 - Valid message exchange

  • Scope: Check valid message exchange, cryptographically proven message exchange
  • Out of scope:
    • Performance data
    • Validate if messages are only consumed once (since this is a requirement of MQ)
    • Multiple producers / validators (although it's possible)
  • Algorithm used to prove correct message exchange: Similar to many blockchains, a (simplistic) PoW

Producer

Generate hash over the last produced PoW, last produced proof and new proof, where proof is a prime number (to further increase complexity and unpredictability, this is a random number (between fixed MIN/MAX, however, the resulting prime can be larger than MAX) and the hash starts with n number of 0s (zeros).

Example (Genesis hash)

last_block = {
  'hash': '',
  'proof': 100003
}
# Will generate (eg.):
new_block =  {
  'hash': '00004913ba4dc384985524eda948b9b66d9b43fb9be10c9be55c60a9809fe887',
  'proof': 909451
}

Explanation of process

The hash generating process (aka publisher in MQ language), will send the generated "block" over to the message exchange. On the other side the validating processes (aka consumer in MQ language) will dequeue one message after the other and validate each of these "block". All blocks received on this end must be valid, in order to pass this test.

Flowchart, since "A picture is worth a thousand words"

graph LR
    subgraph Cloud
        AMQP((Message Queue))
    end
    subgraph Cloud/OnPrem
        Producer{Producer} -- send hash --> AMQP
    end
    subgraph Cloud/OnPrem
        AMQP -- "receive (and ack)" --> Validator(Validator)
    end

Scenario 2 - Performance

  • Scope: Check performance, low level validation of results
  • Out of scope: Multiple producers/consumers (although possible)

Producer

The generating process (aka publisher in MQ language), will just send a fixed hash to the message broker. The hash, however, contains, at least, UTF-8 characters, if not UTF-16. Since this is a fixed string, it can still be validated on the other side by the validating processes (aka consumer in MQ language).

Example hash


'testdict': {
  'مفتاح': قيمة', # Arabic; key: value
  '键': u'值', # Chinese; key: value
  'キー': u'値', # Japanese; key: value
}

Explanation of the test

Since the process sends non-ASCII characters over the channel, we can see if the encoding/decoding works correct or not. In theory this will also test if JSON module from Python does it's job correctly, but given our experience, this is usually not the problem.

Scenario 3 - Distribute single message to multiple consumers

  • Scope: Use fanout mechanism in order to distribute a single message to multiple consumers, test message expiration (TTL)
  • Out of scope: n/a

Producer

The producing process sends, every second, a simple 'Hello world' string to the message broker with a expiration time of 10 seconds.

Consumer

The consuming process will connect to the message broker and ask for creation of a new queue (name shall be specifiable on the command line, default: the host name where the script is currently executed) and bind this queue to the exchange the producer created.

Since messages are only valid for 10 seconds, the consuming process will receive only the messages of the last 10 seconds (~ 10 messages).

Flowchart, since "A picture is worth a thousand words"

graph LR
    subgraph Cloud
        AMQP((Message Queue))
    end
    subgraph Cloud/OnPrem
        Producer{Producer} -- "send 'Hello world'" --> AMQP
    end
    subgraph Cloud/OnPrem
        AMQP -- "receive (and ack)" --> Consumer(Consumer)
    end

Scenario 4 - Active message acknowledgement/rejection on the consumer side

  • Scope: Randomly acknowledge, reject or 'loose' (ie. not send ack/reject) messages.
  • Out of scope: n/a

Producer

Every second, send a simple 'Hello world' string to the message broker.

Consumer

As already mentioned in the scope, this process will randomly acknowledge, reject or loose messages.

Acknowledged message

Message is removed from the queue.

Rejected message

Message is re-queued.

Lost message

This is the trickiest problem; generally best described by the 'Two generals problem'. In short: How can the broker know the message is lost if neither an acknowledgement nor a reject is received? The well known and observed (using default configuration of CloudAMQP) behaviour in this case is: As long as the process that received the message is connected, the broker will put the message into the state 'unacknowledged', since it has to guess/expect that the process is still working on that item. As soon as the process disconnects, the broker will re-queue the messages that are in that 'unacknowledged' status. Next time a consumer connects these messages will be available again for the process to be consumed.

Flowchart, since "A picture is worth a thousand words"

graph LR
    subgraph Cloud
        AMQP((Message Queue))
        AMQP -. requeue .-> AMQP
    end
    subgraph Cloud/OnPrem
        Producer{Producer} -- "send 'Hello world'" --> AMQP
    end
    subgraph /dev/null
    Nirvana[Nirvana]
    style Nirvana fill:#ccf,stroke:#f66,stroke-width:2px,stroke-dasharray: 5, 5
    end
    subgraph Cloud/OnPrem
        AMQP -- receive --> Consumer(Consumer)
        Consumer -. acknowledge .-> AMQP
        Consumer -. reject .-> AMQP
        Consumer -. loose .-> Nirvana
    end

Scenario 5 - RPC Example

  • Scope: Test basic RPC
  • Out of scope:
    • Multiple servers or multiple clients
    • Verification if the server sent the correct result

Is is basically a replication of AMQP tutorial 6

Client

In an endless loop, keep increasing a number (starting at 1) and, via RPC over AMQP, ask if it is a prime number (or not), skipping all even numbers, since this is a very cheap calculation.

Server

Return to the client if the number it sent is a prime number or not (True/False).

Flowchart, since "A picture is worth a thousand words"

graph LR
    subgraph Cloud
    subgraph AMQP
        QuestionQueue(Question Queue)
        TempQueue2(Temporary queue for answers)
    end
    end
    subgraph Cloud/OnPrem
        Client{Client} -- "ask if X is prime" --> QuestionQueue
        TempQueue2 -- send result received from server --> Client
    end
    subgraph Cloud/OnPrem
        QuestionQueue -- "get question" --> RPC(RPC Server)
        RPC -- "answer question if X is prime" --> TempQueue2
    end

Scenario 6 - Queue instance not available

Questions:

  • What happens if the queue is not available?
  • What are the concepts/available options in regards to business continuity and (high) availability?
  • Start up stress test: Broker was down, but during start up, producer continuously tries to send messages.

Testing:

  • The producers shall try to send the messages continuously, until the message has been accepted by the broker.

Script

TBD: Shall the script keep trying to send messages or not?

Scenario 7 - Message producer/consumer not available

Questions:

  • Which information is stored in the message queue if subscribers have not been able to pick up messages?
  • Can we report such situation automatically in a dashboard or via e-mail? Especially in cases where a messages haven't been picked up within their TTL

Testing

  • Simple approach: Just run one producer, but no consumers.
  • Complex approach: Run one producer and some (but not all) consumers.
  • More complex approach: Run multiple producers and some (but not all) consumers.

Script

TBD.

Questions

User/Role and privilege concept

  • General introduction to the role - and user concept, including introduction of standard privilege settings (i.e. subscriber, administrator, ...).
  • Authentication/authorization sshall be performed via claims based SAML 2.0

Prioritization and validity of messages (TTL)

  • Is it possible to send messages with higher priority in order to make sure it's delivered as fast as possible, in case there are already several messages in the queue?
  • Is setting TTL on individual messages possible in order specify how long a specific message is valid? Is the message then automatically removed from the queue?

Acknowledgement (fire and forget vs. guaranteed delivery/processing)

  • Show how consumer can acknowledge or not acknowledge message delivery/processing.
  • Is the message queue able to show which consumer has fetched a message and if it has been acknowledged by this specific consumer?
  • Different states shall be available to show the specific state each consumer/producer has; Eg. Sends information, receives information, last connection interrupt, etc.

Script

While basic_ack/_nack can easily shown scripted, the different status a message may eventually have, can only be shown in the dashboard.

Dashboard

The solution should provide a web interface (WUI) where all relevant information can be tracked, like messages received/sent per queue, per day/month/year, errors per queue and so on.

Comparison

All these tests will be done with CloudAMQP as well (separately).

Note: Since we're using the free plan (aka 'Lemur') on CloudAMQP, your mileage may vary.

Deployment of the test scripts

Computing resources

Resource consumption shouldn't be too high. The scripts are not (multi-)[threaded](https://en.wikipedia.org/wiki/Thread_(computing) (See also: Threaded code and will only consume at maximum one (logical) core per (Python) instance. Memory and disk space consumption can be neglected (only a few MB).

Requirements

  • Firewall open for accessing MQ server
    • Depends a bit on the implementation/configuration, but usually AMQP uses 5672/tcp/udp and AMQP over TLS (aka AMQPs) uses 5671/tcp/udp. See also IANA about the officially assigned port numbers.
  • Python 2.x or 3.x
  • Python virtualenv
  • HTTP(s) proxy (for pip)
  • Pika Python AMQP Client Library (aka pika)

Steps to deploy

  • Checkout source via git (if git is not installed: yum install -y git (on RHEL/CentOS))
  • Create Python virtual environment
  • Source virtualenv (See also child section)
  • Install requirements (eg. using pip, since this is known to work; See also child section)
  • Change AMQP URL by exporting shell variable AMQP_URL or edit the code
  • Run as many publishers/consumer as you like (eg. ./producer.py or ./validate.py

Python virtualenv

Usually installed on CentOS/Red Hat Linux, aka RHEL (6 and 7, untested on 5) via:

yum install python-virtualenv

Python virtualenv

Usually this can be installed with:

virtualenv <path/to/virtualenv>

Proxy

In case you need to use a http(s) proxy (for virtualenv/pip), export the following environment variable (bash):

export http_proxy=http://<yourproxy.cmpny.tld>:<port>
export https_proxy=$http_proxy
virtualenv <path/to/virtualenv>

Python requirements

Python requirements are installed via:

pip install -r requirements.txt

PDF version of this Wiki

Latest PDF should be available here (if you are logged in!): https://git.linux-kernel.at/oliver/siamese/-/jobs/artifacts/master/download?job=Wiki2PDF