Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/execution_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ CassError cass_execution_profile_set_latency_aware_routing_settings(
CassError cass_execution_profile_set_whitelist_filtering(CassExecProfile* profile,
const char* hosts) {
return cass_execution_profile_set_whitelist_filtering_n(profile, hosts, SAFE_STRLEN(hosts));
return CASS_OK;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one seems like a good idea on it's face

}

CassError cass_execution_profile_set_whitelist_filtering_n(CassExecProfile* profile,
Expand Down
34 changes: 17 additions & 17 deletions src/mpmc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ class MPMCQueue : public Allocated {
// buffer must be a size which is a power of 2. this also allows
// the sequence to double as a ticket/lock.
size_t pos = tail_.load(MEMORY_ORDER_RELAXED);
Node* node;

for (;;) {
Node* node = &buffer_[pos & mask_];
node = &buffer_[pos & mask_];
size_t node_seq = node->seq.load(MEMORY_ORDER_ACQUIRE);
intptr_t dif = (intptr_t)node_seq - (intptr_t)pos;

Expand All @@ -69,11 +70,7 @@ class MPMCQueue : public Allocated {
// weak compare is faster, but can return spurious results
// which in this instance is OK, because it's in the loop
if (tail_.compare_exchange_weak(pos, pos + 1, MEMORY_ORDER_RELAXED)) {
// set the data
node->data = data;
// increment the sequence so that the tail knows it's accessible
node->seq.store(pos + 1, MEMORY_ORDER_RELEASE);
return true;
break;
}
} else if (dif < 0) {
// if seq is less than head seq then it means this slot is
Expand All @@ -85,15 +82,19 @@ class MPMCQueue : public Allocated {
}
}

// never taken
return false;
// set the data
node->data = data;
// increment the sequence so that the tail knows it's accessible
node->seq.store(pos + 1, MEMORY_ORDER_RELEASE);
return true;
}

bool dequeue(T& data) {
size_t pos = head_.load(MEMORY_ORDER_RELAXED);
Node* node;

for (;;) {
Node* node = &buffer_[pos & mask_];
node = &buffer_[pos & mask_];
size_t node_seq = node->seq.load(MEMORY_ORDER_ACQUIRE);
intptr_t dif = (intptr_t)node_seq - (intptr_t)(pos + 1);

Expand All @@ -104,12 +105,7 @@ class MPMCQueue : public Allocated {
// weak compare is faster, but can return spurious results
// which in this instance is OK, because it's in the loop
if (head_.compare_exchange_weak(pos, pos + 1, MEMORY_ORDER_RELAXED)) {
// set the output
data = node->data;
// set the sequence to what the head sequence should be next
// time around
node->seq.store(pos + mask_ + 1, MEMORY_ORDER_RELEASE);
return true;
break;
}
} else if (dif < 0) {
// if seq is less than head seq then it means this slot is
Expand All @@ -121,8 +117,12 @@ class MPMCQueue : public Allocated {
}
}

// never taken
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two seemed a bit more perplexing to me. I'm assuming they exist to make code analysis happy but I'm wondering if maybe it isn't better to do something like the following:

diff --git a/src/mpmc_queue.hpp b/src/mpmc_queue.hpp
index f6cd146c..53abdf5d 100644
--- a/src/mpmc_queue.hpp
+++ b/src/mpmc_queue.hpp
@@ -69,11 +69,7 @@ public:
         // weak compare is faster, but can return spurious results
         // which in this instance is OK, because it's in the loop
         if (tail_.compare_exchange_weak(pos, pos + 1, MEMORY_ORDER_RELAXED)) {
-          // set the data
-          node->data = data;
-          // increment the sequence so that the tail knows it's accessible
-          node->seq.store(pos + 1, MEMORY_ORDER_RELEASE);
-          return true;
+          break;
         }
       } else if (dif < 0) {
         // if seq is less than head seq then it means this slot is
@@ -85,8 +81,11 @@ public:
       }
     }
 
-    // never taken
-    return false;
+    // set the data
+    node->data = data;
+    // increment the sequence so that the tail knows it's accessible
+    node->seq.store(pos + 1, MEMORY_ORDER_RELEASE);
+    return true;
   }
 
   bool dequeue(T& data) {
@@ -104,12 +103,7 @@ public:                                                                                                                                                                                          
         // weak compare is faster, but can return spurious results                                                                                                                                                   
         // which in this instance is OK, because it's in the loop                                                                                                                                                    
         if (head_.compare_exchange_weak(pos, pos + 1, MEMORY_ORDER_RELAXED)) {                                                                                                                                       
-          // set the output                                                                                                                                                                                          
-          data = node->data;                                                                                                                                                                                         
-          // set the sequence to what the head sequence should be next                                                                                                                                               
-          // time around                                                                                                                                                                                             
-          node->seq.store(pos + mask_ + 1, MEMORY_ORDER_RELEASE);                                                                                                                                                    
-          return true;                                                                                                                                                                                               
+          break;                                                                                                                                                                                                     
         }                                                                                                                                                                                                            
       } else if (dif < 0) {                                                                                                                                                                                          
         // if seq is less than head seq then it means this slot is                                                                                                                                                   
@@ -121,8 +115,12 @@ public:                                                                                                                                                                                          
       }                                                                                                                                                                                                              
     }                                                                                                                                                                                                                
                                                                                                                                                                                                                      
-    // never taken                                                                                                                                                                                                   
-    return false;                                                                                                                                                                                                    
+    // set the output                                                                                                                                                                                                
+    data = node->data;                                                                                                                                                                                               
+    // set the sequence to what the head sequence should be next                                                                                                                                                     
+    // time around                                                                                                                                                                                                   
+    node->seq.store(pos + mask_ + 1, MEMORY_ORDER_RELEASE);                                                                                                                                                          
+    return true;                                                                                                                                                                                                     
   }                                                                                                                                                                                                                  
                                                                                                                                                                                                                      
   bool is_empty() const { 

This change should be logically equivalent to the previous impl. It also brings us closer to the original implementation.

Whaddya think @SeverinLeonhardt ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My change is definitely to make code analysis happy, so maybe either way some compiler will complain.

Your suggested changes look good to me and nicely work around it. Actually I'd say this code is even better structured. If I understand GitHub right you should be able to push commits to the PR. Let me know if that doesn't work, then I'll add it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading these docs right that's something that can be setup when the PR is created. Honestly it's probably faster/easier for you to incorporate the changes directly in another commit @SeverinLeonhardt. If that works for you let's just go with that; I'll take another look once the changes are in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@absurdfarce Sorry for the delay, commit is added to the PR now. I've also used the patched version to run some tests locally and it seemed to all work fine.

// set the output
data = node->data;
// set the sequence to what the head sequence should be next
// time around
node->seq.store(pos + mask_ + 1, MEMORY_ORDER_RELEASE);
return true;
}

bool is_empty() const {
Expand Down